conformance.py 26 KB


  1. from services.bottom.V0 import Bottom
  2. from uuid import UUID
  3. from state.base import State
  4. from typing import Dict, Tuple, Set, Any, List
  5. from pprint import pprint
  6. class Conformance:
  7. def __init__(self, state: State, scd_model: UUID, model: UUID, type_model: UUID):
  8. self.state = state
  9. self.bottom = Bottom(state)
  10. self.scd_model = scd_model
  11. self.model = model
  12. self.type_model = type_model
  13. self.type_mapping: Dict[str, str] = {}
  14. self.model_names = {
  15. # map model elements to their names to prevent iterating too much
  16. self.bottom.read_outgoing_elements(self.model, e)[0]: e
  17. for e in self.bottom.read_keys(self.model)
  18. }
  19. self.type_model_names = {
  20. # map type model elements to their names to prevent iterating too much
  21. self.bottom.read_outgoing_elements(self.type_model, e)[0]: e
  22. for e in self.bottom.read_keys(self.type_model)
  23. }
  24. self.sub_types: Dict[str, Set[str]] = {
  25. k: set() for k in self.bottom.read_keys(self.type_model)
  26. }
  27. self.primitive_values: Dict[UUID, Any] = {}
  28. self.abstract_types: List[str] = []
  29. self.multiplicities: Dict[str, Tuple] = {}
  30. self.source_multiplicities: Dict[str, Tuple] = {}
  31. self.target_multiplicities: Dict[str, Tuple] = {}
  32. self.structures = {}
  33. self.matches = {}
  34. self.candidates = {}
  35. def check_nominal(self):
  36. steps = [
  37. self.check_typing,
  38. self.check_link_typing,
  39. self.check_multiplicities,
  40. self.check_constraints
  41. ]
  42. for step in steps:
  43. conforms = step()
  44. if not conforms:
  45. return False
  46. return True
  47. def read_attribute(self, element: UUID, attr_name: str):
  48. if element in self.type_model_names:
  49. # type model element
  50. element_name = self.type_model_names[element]
  51. model = self.type_model
  52. else:
  53. # model element
  54. element_name = self.model_names[element]
  55. model = self.model
  56. try:
  57. attr_elem, = self.bottom.read_outgoing_elements(model, f"{element_name}.{attr_name}")
  58. return self.primitive_values.get(attr_elem, self.bottom.read_value(attr_elem))
  59. except ValueError:
  60. return None
  61. def precompute_sub_types(self):
  62. inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
  63. inh_links = []
  64. for tm_element, tm_name in self.type_model_names.items():
  65. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  66. if inh_element in morphisms:
  67. inh_links.append(tm_element)
  68. for link in inh_links:
  69. tm_source = self.bottom.read_edge_source(link)
  70. tm_target = self.bottom.read_edge_target(link)
  71. parent_name = self.type_model_names[tm_target]
  72. child_name = self.type_model_names[tm_source]
  73. self.sub_types[parent_name].add(child_name)
  74. stop = False
  75. while not stop:
  76. stop = True
  77. for child_name, child_children in self.sub_types.items():
  78. for parent_name, parent_children in self.sub_types.items():
  79. if child_name in parent_children:
  80. original_size = len(parent_children)
  81. parent_children.update(child_children)
  82. if len(parent_children) != original_size:
  83. stop = False
  84. def deref_primitive_values(self):
  85. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  86. string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
  87. boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
  88. integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
  89. t_deref = []
  90. t_refs = []
  91. for tm_element, tm_name in self.type_model_names.items():
  92. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  93. if ref_element in morphisms:
  94. t_refs.append(self.type_model_names[tm_element])
  95. elif string_element in morphisms:
  96. t_deref.append(tm_element)
  97. elif boolean_element in morphisms:
  98. t_deref.append(tm_element)
  99. elif integer_element in morphisms:
  100. t_deref.append(tm_element)
  101. for elem in t_deref:
  102. primitive_model = UUID(self.bottom.read_value(elem))
  103. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  104. primitive_value = self.bottom.read_value(primitive_value_node)
  105. self.primitive_values[elem] = primitive_value
  106. for m_name, tm_name in self.type_mapping.items():
  107. if tm_name in t_refs:
  108. # dereference
  109. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  110. primitive_model = UUID(self.bottom.read_value(m_element))
  111. try:
  112. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  113. primitive_value = self.bottom.read_value(primitive_value_node)
  114. self.primitive_values[m_element] = primitive_value
  115. except ValueError:
  116. pass # multiple elements in model indicate that we're not dealing with a primitive
  117. def precompute_multiplicities(self):
  118. for tm_element, tm_name in self.type_model_names.items():
  119. # class abstract flags and multiplicities
  120. abstract = self.read_attribute(tm_element, "abstract")
  121. lc = self.read_attribute(tm_element, "lower_cardinality")
  122. uc = self.read_attribute(tm_element, "upper_cardinality")
  123. if abstract:
  124. self.abstract_types.append(tm_name)
  125. if lc or uc:
  126. mult = (
  127. lc if lc is not None else float("-inf"),
  128. uc if uc is not None else float("inf")
  129. )
  130. self.multiplicities[tm_name] = mult
  131. # multiplicities for associations
  132. slc = self.read_attribute(tm_element, "source_lower_cardinality")
  133. suc = self.read_attribute(tm_element, "source_upper_cardinality")
  134. if slc or suc:
  135. mult = (
  136. slc if slc is not None else float("-inf"),
  137. suc if suc is not None else float("inf")
  138. )
  139. self.source_multiplicities[tm_name] = mult
  140. tlc = self.read_attribute(tm_element, "target_lower_cardinality")
  141. tuc = self.read_attribute(tm_element, "target_upper_cardinality")
  142. if tlc or tuc:
  143. mult = (
  144. tlc if tlc is not None else float("-inf"),
  145. tuc if tuc is not None else float("inf")
  146. )
  147. self.target_multiplicities[tm_name] = mult
  148. # optional for attribute links
  149. opt = self.read_attribute(tm_element, "optional")
  150. if opt is not None:
  151. self.source_multiplicities[tm_name] = (0 if opt else 1, 1)
  152. self.target_multiplicities[tm_name] = (0, 1)
  153. def get_type(self, element: UUID):
  154. morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
  155. tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
  156. return tm_element
  157. def check_typing(self):
  158. """
  159. for each element of model check whether a morphism
  160. link exists to some element of type_model
  161. """
  162. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  163. model_names = self.bottom.read_keys(self.model)
  164. for m_name in model_names:
  165. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  166. try:
  167. tm_element = self.get_type(m_element)
  168. tm_name = self.type_model_names[tm_element]
  169. self.type_mapping[m_name] = tm_name
  170. if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"):
  171. sub_m = UUID(self.bottom.read_value(m_element))
  172. sub_tm = UUID(self.bottom.read_value(tm_element))
  173. if not Conformance(self.state, self.scd_model, sub_m, sub_tm).check_nominal():
  174. return False
  175. except ValueError:
  176. # no or too many morphism links found
  177. print(f"Incorrectly typed element: {m_name}")
  178. return False
  179. return True
  180. def check_link_typing(self):
  181. self.precompute_sub_types()
  182. for m_name, tm_name in self.type_mapping.items():
  183. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  184. m_source = self.bottom.read_edge_source(m_element)
  185. m_target = self.bottom.read_edge_target(m_element)
  186. if m_source is None or m_target is None:
  187. # element is not a link
  188. continue
  189. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  190. tm_source = self.bottom.read_edge_source(tm_element)
  191. tm_target = self.bottom.read_edge_target(tm_element)
  192. # check if source is typed correctly
  193. source_name = self.model_names[m_source]
  194. source_type_actual = self.type_mapping[source_name]
  195. source_type_expected = self.type_model_names[tm_source]
  196. if source_type_actual != source_type_expected:
  197. if source_type_actual not in self.sub_types[source_type_expected]:
  198. print(f"Invalid source type {source_type_actual} for element {m_name}")
  199. return False
  200. # check if target is typed correctly
  201. target_name = self.model_names[m_target]
  202. target_type_actual = self.type_mapping[target_name]
  203. target_type_expected = self.type_model_names[tm_target]
  204. if target_type_actual != target_type_expected:
  205. if target_type_actual not in self.sub_types[target_type_expected]:
  206. print(f"Invalid target type {target_type_actual} for element {m_name}")
  207. return False
  208. return True
  209. def check_multiplicities(self):
  210. self.deref_primitive_values()
  211. self.precompute_multiplicities()
  212. for tm_name in self.type_model_names.values():
  213. # abstract classes
  214. if tm_name in self.abstract_types:
  215. type_count = list(self.type_mapping.values()).count(tm_name)
  216. if type_count > 0:
  217. print(f"Invalid instantiation of abstract class: {tm_name}")
  218. return False
  219. # class multiplicities
  220. if tm_name in self.multiplicities:
  221. lc, uc = self.multiplicities[tm_name]
  222. type_count = list(self.type_mapping.values()).count(tm_name)
  223. for sub_type in self.sub_types[tm_name]:
  224. type_count += list(self.type_mapping.values()).count(sub_type)
  225. if type_count < lc or type_count > uc:
  226. print(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})")
  227. return False
  228. # association source multiplicities
  229. if tm_name in self.source_multiplicities:
  230. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  231. tm_source_element = self.bottom.read_edge_source(tm_element)
  232. tm_source_name = self.type_model_names[tm_source_element]
  233. lc, uc = self.source_multiplicities[tm_name]
  234. for i, t in self.type_mapping.items():
  235. if t == tm_source_name or t in self.sub_types[tm_source_name]:
  236. count = 0
  237. i_element, = self.bottom.read_outgoing_elements(self.model, i)
  238. outgoing = self.bottom.read_outgoing_edges(i_element)
  239. for o in outgoing:
  240. try:
  241. if self.type_mapping[self.model_names[o]] == tm_name:
  242. count += 1
  243. except KeyError:
  244. pass # for elements not part of model, e.g. morphism links
  245. if count < lc or count > uc:
  246. print(f"Source cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
  247. return False
  248. # association target multiplicities
  249. if tm_name in self.target_multiplicities:
  250. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  251. tm_target_element = self.bottom.read_edge_target(tm_element)
  252. tm_target_name = self.type_model_names[tm_target_element]
  253. lc, uc = self.target_multiplicities[tm_name]
  254. for i, t in self.type_mapping.items():
  255. if t == tm_target_name or t in self.sub_types[tm_target_name]:
  256. count = 0
  257. i_element, = self.bottom.read_outgoing_elements(self.model, i)
  258. outgoing = self.bottom.read_incoming_edges(i_element)
  259. for o in outgoing:
  260. try:
  261. if self.type_mapping[self.model_names[o]] == tm_name:
  262. count += 1
  263. except KeyError:
  264. pass # for elements not part of model, e.g. morphism links
  265. if count < lc or count > uc:
  266. print(f"Target cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
  267. return False
  268. return True
  269. def evaluate_constraint(self, code, **kwargs):
  270. funcs = {
  271. 'read_value': self.state.read_value
  272. }
  273. return eval(
  274. code,
  275. {'__builtins__': {'isinstance': isinstance, 'print': print,
  276. 'int': int, 'float': float, 'bool': bool, 'str': str, 'tuple': tuple}
  277. }, # globals
  278. {**kwargs, **funcs} # locals
  279. )
  280. def check_constraints(self):
  281. # local constraints
  282. for m_name, tm_name in self.type_mapping.items():
  283. if tm_name != "GlobalConstraint":
  284. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  285. code = self.read_attribute(tm_element, "constraint")
  286. if code is not None:
  287. morphisms = self.bottom.read_incoming_elements(tm_element, "Morphism")
  288. morphisms = [m for m in morphisms if m in self.model_names]
  289. for m_element in morphisms:
  290. if not self.evaluate_constraint(code, element=m_element):
  291. return False
  292. # global constraints
  293. for m_name, tm_name in self.type_mapping.items():
  294. if tm_name == "GlobalConstraint":
  295. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  296. code = self.read_attribute(tm_element, "constraint")
  297. if code is not None:
  298. if not self.evaluate_constraint(code, model=self.model):
  299. return False
  300. return True
  301. def precompute_structures(self):
  302. self.precompute_sub_types()
  303. scd_elements = self.bottom.read_outgoing_elements(self.scd_model)
  304. # collect types
  305. class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class")
  306. association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association")
  307. for tm_element, tm_name in self.type_model_names.items():
  308. # retrieve elements that tm_element is a morphism of
  309. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  310. morphism, = [m for m in morphisms if m in scd_elements]
  311. # check if tm_element is a morphism of AttributeLink
  312. if class_element == morphism or association_element == morphism:
  313. self.structures[tm_name] = set()
  314. # collect type structures
  315. # retrieve AttributeLink to check whether element is a morphism of AttributeLink
  316. attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink")
  317. for tm_element, tm_name in self.type_model_names.items():
  318. # retrieve elements that tm_element is a morphism of
  319. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  320. morphism, = [m for m in morphisms if m in scd_elements]
  321. # check if tm_element is a morphism of AttributeLink
  322. if attr_link_element == morphism:
  323. # retrieve attributes of attribute link, i.e. 'name' and 'optional'
  324. attrs = self.bottom.read_outgoing_elements(tm_element)
  325. name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs)
  326. opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs)
  327. # get attr name value
  328. name_model = UUID(self.bottom.read_value(name_model_node))
  329. name_node, = self.bottom.read_outgoing_elements(name_model)
  330. name = self.bottom.read_value(name_node)
  331. # get attr opt value
  332. opt_model = UUID(self.bottom.read_value(opt_model_node))
  333. opt_node, = self.bottom.read_outgoing_elements(opt_model)
  334. opt = self.bottom.read_value(opt_node)
  335. # get attr type name
  336. source_type_node = self.bottom.read_edge_source(tm_element)
  337. source_type_name = self.type_model_names[source_type_node]
  338. target_type_node = self.bottom.read_edge_target(tm_element)
  339. target_type_name = self.type_model_names[target_type_node]
  340. # add attribute to the structure of its source type
  341. # attribute is stored as a (name, optional, type) triple
  342. self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name))
  343. # extend structures of sub types with attrs of super types
  344. for super_type, sub_types in self.sub_types.items():
  345. for sub_type in sub_types:
  346. self.structures.setdefault(sub_type, set()).update(self.structures[super_type])
  347. # filter out abstract types, as they cannot be instantiated
  348. # retrieve Class_abstract to check whether element is a morphism of Class_abstract
  349. class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract")
  350. for tm_element, tm_name in self.type_model_names.items():
  351. # retrieve elements that tm_element is a morphism of
  352. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  353. morphism, = [m for m in morphisms if m in scd_elements]
  354. # check if tm_element is a morphism of Class_abstract
  355. if class_abs_element == morphism:
  356. # retrieve 'abstract' attribute value
  357. target_node = self.bottom.read_edge_target(tm_element)
  358. abst_model = UUID(self.bottom.read_value(target_node))
  359. abst_node, = self.bottom.read_outgoing_elements(abst_model)
  360. is_abstract = self.bottom.read_value(abst_node)
  361. # retrieve type name
  362. source_node = self.bottom.read_edge_source(tm_element)
  363. type_name = self.type_model_names[source_node]
  364. if is_abstract:
  365. self.structures.pop(type_name)
  366. def match_structures(self):
  367. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  368. # matching
  369. for m_element, m_name in self.model_names.items():
  370. is_edge = self.bottom.read_edge_source(m_element) is not None
  371. for type_name, structure in self.structures.items():
  372. tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name)
  373. type_is_edge = self.bottom.read_edge_source(tm_element) is not None
  374. if is_edge == type_is_edge:
  375. matched = 0
  376. for name, optional, attr_type in structure:
  377. try:
  378. attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}")
  379. attr_tm, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
  380. # if attribute is a modelref, we need to check whether it
  381. # linguistically conforms to the specified type
  382. # if its an internally defined attribute, this will be checked by constraints
  383. morphisms = self.bottom.read_outgoing_elements(attr_tm, "Morphism")
  384. attr_conforms = True
  385. if ref_element in morphisms:
  386. # check conformance of reference model
  387. type_model_uuid = UUID(self.bottom.read_value(attr_tm))
  388. model_uuid = UUID(self.bottom.read_value(attr))
  389. attr_conforms = Conformance(self.state, self.scd_model, model_uuid, type_model_uuid)\
  390. .check_nominal()
  391. else:
  392. # eval constraints
  393. code = self.read_attribute(attr_tm, "constraint")
  394. if code is not None:
  395. attr_conforms = self.evaluate_constraint(code, element=attr)
  396. if attr_conforms:
  397. matched += 1
  398. except ValueError:
  399. # attr not found or failed parsing UUID
  400. if optional:
  401. continue
  402. else:
  403. break
  404. if matched == len(structure):
  405. self.candidates.setdefault(m_name, set()).add(type_name)
  406. # filter out candidates for links based on source and target types
  407. for m_element, m_name in self.model_names.items():
  408. is_edge = self.bottom.read_edge_source(m_element) is not None
  409. if is_edge and m_name in self.candidates:
  410. m_source = self.bottom.read_edge_source(m_element)
  411. m_target = self.bottom.read_edge_target(m_element)
  412. source_candidates = self.candidates[self.model_names[m_source]]
  413. target_candidates = self.candidates[self.model_names[m_target]]
  414. remove = set()
  415. for candidate_name in self.candidates[m_name]:
  416. candidate_element, = self.bottom.read_outgoing_elements(self.type_model, candidate_name)
  417. candidate_source = self.type_model_names[self.bottom.read_edge_source(candidate_element)]
  418. if candidate_source not in source_candidates:
  419. remove.add(candidate_name)
  420. candidate_target = self.type_model_names[self.bottom.read_edge_target(candidate_element)]
  421. if candidate_target not in target_candidates:
  422. remove.add(candidate_name)
  423. self.candidates[m_name] = self.candidates[m_name].difference(remove)
  424. def build_morphisms(self):
  425. if not all([len(c) == 1 for c in self.candidates.values()]):
  426. print("Ambiguous structural matches found, unable to build unique morphism.")
  427. return False
  428. mapping = {k: v.pop() for k, v in self.candidates.items()}
  429. for m_name, tm_name in mapping.items():
  430. # morphism to class/assoc
  431. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  432. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  433. self.bottom.create_edge(m_element, tm_element, "Morphism")
  434. # morphism for attributes and attribute links
  435. structure = self.structures[tm_name]
  436. for attr_name, _, attr_type in structure:
  437. try:
  438. # attribute node
  439. attr_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}")
  440. attr_type_element, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
  441. self.bottom.create_edge(attr_element, attr_type_element, "Morphism")
  442. # attribute link
  443. attr_link_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}_link")
  444. attr_link_type_element, = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}_{attr_name}")
  445. self.bottom.create_edge(attr_link_element, attr_link_type_element, "Morphism")
  446. except ValueError:
  447. pass
  448. if __name__ == '__main__':
  449. from state.devstate import DevState as State
  450. s = State()
  451. from bootstrap.scd import bootstrap_scd
  452. scd = bootstrap_scd(s)
  453. from bootstrap.pn import bootstrap_pn
  454. ltm_pn = bootstrap_pn(s, "PN")
  455. ltm_pn_lola = bootstrap_pn(s, "PNlola")
  456. from services.pn import PN
  457. my_pn = s.create_node()
  458. PNserv = PN(ltm_pn, my_pn, s)
  459. PNserv.create_place("p1", 5)
  460. PNserv.create_place("p2", 0)
  461. PNserv.create_transition("t1")
  462. PNserv.create_p2t("p1", "t1", 1)
  463. PNserv.create_t2p("t1", "p2", 1)
  464. cf = Conformance(s, scd, my_pn, ltm_pn_lola)
  465. # cf = Conformance(s, scd, ltm_pn, scd)
  466. cf.precompute_structures()
  467. cf.match_structures()
  468. cf.build_morphisms()
  469. print(cf.check_nominal())