conformance.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. from services.bottom.V0 import Bottom
  2. from uuid import UUID
  3. from state.base import State
  4. from typing import Dict, Tuple, Set, Any, List
  5. from pprint import pprint
  6. class Conformance:
  7. def __init__(self, state: State, scd_model: UUID, model: UUID, type_model: UUID):
  8. self.state = state
  9. self.bottom = Bottom(state)
  10. self.scd_model = scd_model
  11. self.model = model
  12. self.type_model = type_model
  13. self.type_mapping: Dict[str, str] = {}
  14. self.model_names = {
  15. # map model elements to their names to prevent iterating too much
  16. self.bottom.read_outgoing_elements(self.model, e)[0]: e
  17. for e in self.bottom.read_keys(self.model)
  18. }
  19. self.type_model_names = {
  20. # map type model elements to their names to prevent iterating too much
  21. self.bottom.read_outgoing_elements(self.type_model, e)[0]: e
  22. for e in self.bottom.read_keys(self.type_model)
  23. }
  24. self.sub_types: Dict[str, Set[str]] = {
  25. k: set() for k in self.bottom.read_keys(self.type_model)
  26. }
  27. self.primitive_values: Dict[UUID, Any] = {}
  28. self.abstract_types: List[str] = []
  29. self.multiplicities: Dict[str, Tuple] = {}
  30. self.source_multiplicities: Dict[str, Tuple] = {}
  31. self.target_multiplicities: Dict[str, Tuple] = {}
  32. self.structures = {}
  33. self.matches = {}
  34. self.candidates = {}
  35. def check_nominal(self):
  36. steps = [
  37. self.check_typing,
  38. self.check_link_typing,
  39. self.check_multiplicities,
  40. self.check_constraints
  41. ]
  42. for step in steps:
  43. conforms = step()
  44. if not conforms:
  45. return False
  46. return True
  47. def read_attribute(self, m_element: UUID, attr_name: str):
  48. def has_label(_edge: UUID, _label):
  49. elems = self.bottom.read_outgoing_elements(_edge)
  50. for elem in elems:
  51. value = self.primitive_values.get(elem, self.bottom.read_value(elem))
  52. if value is not None and value == _label:
  53. return True
  54. return False
  55. def get_outgoing_edge_by_label(_element: UUID, _label):
  56. edges = self.bottom.read_outgoing_edges(_element)
  57. for e in edges:
  58. if has_label(e, _label):
  59. return e
  60. outgoing = self.bottom.read_outgoing_edges(m_element)
  61. for edge in outgoing:
  62. try:
  63. edge_name = self.model_names[edge]
  64. edge_type_name = self.type_mapping[edge_name]
  65. edge_type, = self.bottom.read_outgoing_elements(self.type_model, edge_type_name)
  66. edge_type_src = self.bottom.read_edge_source(edge_type)
  67. if get_outgoing_edge_by_label(edge_type_src, attr_name) == edge_type:
  68. result = self.bottom.read_edge_target(edge)
  69. return self.primitive_values.get(result, self.bottom.read_value(result))
  70. except KeyError:
  71. pass # non-model edge, e.g. morphism link
  72. def precompute_sub_types(self):
  73. inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
  74. inh_links = []
  75. for tm_element, tm_name in self.type_model_names.items():
  76. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  77. if inh_element in morphisms:
  78. inh_links.append(tm_element)
  79. for link in inh_links:
  80. tm_source = self.bottom.read_edge_source(link)
  81. tm_target = self.bottom.read_edge_target(link)
  82. parent_name = self.type_model_names[tm_target]
  83. child_name = self.type_model_names[tm_source]
  84. self.sub_types[parent_name].add(child_name)
  85. stop = False
  86. while not stop:
  87. stop = True
  88. for child_name, child_children in self.sub_types.items():
  89. for parent_name, parent_children in self.sub_types.items():
  90. if child_name in parent_children:
  91. original_size = len(parent_children)
  92. parent_children.update(child_children)
  93. if len(parent_children) != original_size:
  94. stop = False
  95. def deref_primitive_values(self):
  96. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  97. string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
  98. boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
  99. integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
  100. t_deref = []
  101. t_refs = []
  102. for tm_element, tm_name in self.type_model_names.items():
  103. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  104. if ref_element in morphisms:
  105. t_refs.append(self.type_model_names[tm_element])
  106. elif string_element in morphisms:
  107. t_deref.append(tm_element)
  108. elif boolean_element in morphisms:
  109. t_deref.append(tm_element)
  110. elif integer_element in morphisms:
  111. t_deref.append(tm_element)
  112. for elem in t_deref:
  113. primitive_model = UUID(self.bottom.read_value(elem))
  114. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  115. primitive_value = self.bottom.read_value(primitive_value_node)
  116. self.primitive_values[elem] = primitive_value
  117. for m_name, tm_name in self.type_mapping.items():
  118. if tm_name in t_refs:
  119. # dereference
  120. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  121. primitive_model = UUID(self.bottom.read_value(m_element))
  122. try:
  123. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  124. primitive_value = self.bottom.read_value(primitive_value_node)
  125. self.primitive_values[m_element] = primitive_value
  126. except ValueError:
  127. pass # multiple elements in model indicate that we're not dealing with a primitive
  128. def precompute_multiplicities(self):
  129. for tm_element, tm_name in self.type_model_names.items():
  130. # class abstract flags and multiplicities
  131. abstract = self.read_attribute(tm_element, "abstract")
  132. lc = self.read_attribute(tm_element, "lower_cardinality")
  133. uc = self.read_attribute(tm_element, "upper_cardinality")
  134. if abstract:
  135. self.abstract_types.append(tm_name)
  136. if lc or uc:
  137. mult = (
  138. lc if lc is not None else float("-inf"),
  139. uc if uc is not None else float("inf")
  140. )
  141. self.multiplicities[tm_name] = mult
  142. # multiplicities for associations
  143. slc = self.read_attribute(tm_element, "source_lower_cardinality")
  144. suc = self.read_attribute(tm_element, "source_upper_cardinality")
  145. if slc or suc:
  146. mult = (
  147. slc if slc is not None else float("-inf"),
  148. suc if suc is not None else float("inf")
  149. )
  150. self.source_multiplicities[tm_name] = mult
  151. tlc = self.read_attribute(tm_element, "target_lower_cardinality")
  152. tuc = self.read_attribute(tm_element, "target_upper_cardinality")
  153. if tlc or tuc:
  154. mult = (
  155. tlc if tlc is not None else float("-inf"),
  156. tuc if tuc is not None else float("inf")
  157. )
  158. self.target_multiplicities[tm_name] = mult
  159. # optional for attribute links
  160. opt = self.read_attribute(tm_element, "optional")
  161. if opt is not None:
  162. mult = (0 if opt else 1, 1)
  163. self.source_multiplicities[tm_name] = mult
  164. self.target_multiplicities[tm_name] = mult
  165. def get_type(self, element: UUID):
  166. morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
  167. tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
  168. return tm_element
  169. def check_typing(self):
  170. """
  171. for each element of model check whether a morphism
  172. link exists to some element of type_model
  173. """
  174. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  175. model_names = self.bottom.read_keys(self.model)
  176. for m_name in model_names:
  177. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  178. try:
  179. tm_element = self.get_type(m_element)
  180. tm_name = self.type_model_names[tm_element]
  181. self.type_mapping[m_name] = tm_name
  182. if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"):
  183. sub_m = UUID(self.bottom.read_value(m_element))
  184. sub_tm = UUID(self.bottom.read_value(tm_element))
  185. if not Conformance(self.state, self.scd_model, sub_m, sub_tm).check_nominal():
  186. return False
  187. except ValueError:
  188. # no or too many morphism links found
  189. print(f"Incorrectly typed element: {m_name}")
  190. return False
  191. return True
  192. def check_link_typing(self):
  193. self.precompute_sub_types()
  194. for m_name, tm_name in self.type_mapping.items():
  195. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  196. m_source = self.bottom.read_edge_source(m_element)
  197. m_target = self.bottom.read_edge_target(m_element)
  198. if m_source is None or m_target is None:
  199. # element is not a link
  200. continue
  201. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  202. tm_source = self.bottom.read_edge_source(tm_element)
  203. tm_target = self.bottom.read_edge_target(tm_element)
  204. # check if source is typed correctly
  205. source_name = self.model_names[m_source]
  206. source_type_actual = self.type_mapping[source_name]
  207. source_type_expected = self.type_model_names[tm_source]
  208. if source_type_actual != source_type_expected:
  209. if source_type_actual not in self.sub_types[source_type_expected]:
  210. print(f"Invalid source type {source_type_actual} for element {m_name}")
  211. return False
  212. # check if target is typed correctly
  213. target_name = self.model_names[m_target]
  214. target_type_actual = self.type_mapping[target_name]
  215. target_type_expected = self.type_model_names[tm_target]
  216. if target_type_actual != target_type_expected:
  217. if target_type_actual not in self.sub_types[target_type_expected]:
  218. print(f"Invalid target type {target_type_actual} for element {m_name}")
  219. return False
  220. return True
  221. def check_multiplicities(self):
  222. self.deref_primitive_values()
  223. self.precompute_multiplicities()
  224. for tm_name in self.type_model_names.values():
  225. # abstract classes
  226. if tm_name in self.abstract_types:
  227. type_count = list(self.type_mapping.values()).count(tm_name)
  228. if type_count > 0:
  229. print(f"Invalid instantiation of abstract class: {tm_name}")
  230. return False
  231. # class multiplicities
  232. if tm_name in self.multiplicities:
  233. lc, uc = self.multiplicities[tm_name]
  234. type_count = list(self.type_mapping.values()).count(tm_name)
  235. for sub_type in self.sub_types[tm_name]:
  236. type_count += list(self.type_mapping.values()).count(sub_type)
  237. if type_count < lc or type_count > uc:
  238. print(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})")
  239. return False
  240. # association source multiplicities
  241. if tm_name in self.source_multiplicities:
  242. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  243. tm_source_element = self.bottom.read_edge_source(tm_element)
  244. tm_source_name = self.type_model_names[tm_source_element]
  245. lc, uc = self.source_multiplicities[tm_name]
  246. for i, t in self.type_mapping.items():
  247. if t == tm_source_name or t in self.sub_types[tm_source_name]:
  248. count = 0
  249. i_element, = self.bottom.read_outgoing_elements(self.model, i)
  250. outgoing = self.bottom.read_outgoing_edges(i_element)
  251. for o in outgoing:
  252. try:
  253. if self.type_mapping[self.model_names[o]] == tm_name:
  254. count += 1
  255. except KeyError:
  256. pass # for elements not part of model, e.g. morphism links
  257. if count < lc or count > uc:
  258. print(f"Source cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
  259. return False
  260. # association target multiplicities
  261. if tm_name in self.target_multiplicities:
  262. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  263. tm_target_element = self.bottom.read_edge_source(tm_element)
  264. tm_target_name = self.type_model_names[tm_target_element]
  265. lc, uc = self.target_multiplicities[tm_name]
  266. for i, t in self.type_mapping.items():
  267. if t == tm_target_name or t in self.sub_types[tm_target_name]:
  268. count = 0
  269. i_element, = self.bottom.read_outgoing_elements(self.model, i)
  270. outgoing = self.bottom.read_outgoing_edges(i_element)
  271. for o in outgoing:
  272. try:
  273. if self.type_mapping[self.model_names[o]] == tm_name:
  274. count += 1
  275. except KeyError:
  276. pass # for elements not part of model, e.g. morphism links
  277. if count < lc or count > uc:
  278. print(f"Target cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
  279. return False
  280. return True
  281. def check_constraints(self):
  282. # local constraints
  283. for m_name, tm_name in self.type_mapping.items():
  284. if tm_name != "GlobalConstraint":
  285. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  286. code = self.read_attribute(tm_element, "constraint")
  287. print(code)
  288. # global constraints
  289. for m_name, tm_name in self.type_mapping.items():
  290. if tm_name == "GlobalConstraint":
  291. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  292. code = self.read_attribute(tm_element, "constraint")
  293. print(code)
  294. return True
  295. def precompute_structures(self):
  296. self.precompute_sub_types()
  297. scd_elements = self.bottom.read_outgoing_elements(self.scd_model)
  298. # collect types
  299. class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class")
  300. association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association")
  301. for tm_element, tm_name in self.type_model_names.items():
  302. # retrieve elements that tm_element is a morphism of
  303. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  304. morphism, = [m for m in morphisms if m in scd_elements]
  305. # check if tm_element is a morphism of AttributeLink
  306. if class_element == morphism or association_element == morphism:
  307. self.structures[tm_name] = set()
  308. # collect type structures
  309. # retrieve AttributeLink to check whether element is a morphism of AttributeLink
  310. attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink")
  311. for tm_element, tm_name in self.type_model_names.items():
  312. # retrieve elements that tm_element is a morphism of
  313. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  314. morphism, = [m for m in morphisms if m in scd_elements]
  315. # check if tm_element is a morphism of AttributeLink
  316. if attr_link_element == morphism:
  317. # retrieve attributes of attribute link, i.e. 'name' and 'optional'
  318. attrs = self.bottom.read_outgoing_elements(tm_element)
  319. name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs)
  320. opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs)
  321. # get attr name value
  322. name_model = UUID(self.bottom.read_value(name_model_node))
  323. name_node, = self.bottom.read_outgoing_elements(name_model)
  324. name = self.bottom.read_value(name_node)
  325. # get attr opt value
  326. opt_model = UUID(self.bottom.read_value(opt_model_node))
  327. opt_node, = self.bottom.read_outgoing_elements(opt_model)
  328. opt = self.bottom.read_value(opt_node)
  329. # get attr type name
  330. source_type_node = self.bottom.read_edge_source(tm_element)
  331. source_type_name = self.type_model_names[source_type_node]
  332. target_type_node = self.bottom.read_edge_target(tm_element)
  333. target_type_name = self.type_model_names[target_type_node]
  334. # add attribute to the structure of its source type
  335. # attribute is stored as a (name, optional, type) triple
  336. self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name))
  337. # extend structures of sub types with attrs of super types
  338. for super_type, sub_types in self.sub_types.items():
  339. for sub_type in sub_types:
  340. self.structures.setdefault(sub_type, set()).update(self.structures[super_type])
  341. # filter out abstract types, as they cannot be instantiated
  342. # retrieve Class_abstract to check whether element is a morphism of Class_abstract
  343. class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract")
  344. for tm_element, tm_name in self.type_model_names.items():
  345. # retrieve elements that tm_element is a morphism of
  346. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  347. morphism, = [m for m in morphisms if m in scd_elements]
  348. # check if tm_element is a morphism of Class_abstract
  349. if class_abs_element == morphism:
  350. # retrieve 'abstract' attribute value
  351. target_node = self.bottom.read_edge_target(tm_element)
  352. abst_model = UUID(self.bottom.read_value(target_node))
  353. abst_node, = self.bottom.read_outgoing_elements(abst_model)
  354. is_abstract = self.bottom.read_value(abst_node)
  355. # retrieve type name
  356. source_node = self.bottom.read_edge_source(tm_element)
  357. type_name = self.type_model_names[source_node]
  358. if is_abstract:
  359. self.structures.pop(type_name)
  360. def match_structures(self):
  361. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  362. # match nodes
  363. for m_element, m_name in self.model_names.items():
  364. self.candidates[m_name] = set()
  365. is_edge = self.bottom.read_edge_source(m_element) is not None
  366. for type_name, structure in self.structures.items():
  367. tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name)
  368. type_is_edge = self.bottom.read_edge_source(tm_element) is not None
  369. if is_edge == type_is_edge:
  370. matched = []
  371. for name, optional, attr_type in structure:
  372. try:
  373. attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}")
  374. # if attribute is a modelref, we need to check whether it
  375. # linguistically conforms to the specified type
  376. # if its an internlly defined attribute, this will be checked by constraints later
  377. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  378. if ref_element in morphisms:
  379. pass
  380. except ValueError:
  381. if optional:
  382. continue
  383. else:
  384. break
  385. if len(matched) == len(structure):
  386. self.candidates[m_name].add(type_name)
  387. def build_morphisms(self):
  388. pass
  389. if __name__ == '__main__':
  390. from state.devstate import DevState as State
  391. s = State()
  392. from bootstrap.scd import bootstrap_scd
  393. scd = bootstrap_scd(s)
  394. from bootstrap.pn import bootstrap_pn
  395. ltm_pn = bootstrap_pn(s, "PN")
  396. from services.pn import PN
  397. my_pn = s.create_node()
  398. PNserv = PN(ltm_pn, my_pn, s)
  399. PNserv.create_place("p1", 5)
  400. PNserv.create_place("p2", 0)
  401. PNserv.create_transition("t1")
  402. PNserv.create_p2t("p1", "t1", 1)
  403. PNserv.create_p2t("t1", "p2", 1)
  404. cf = Conformance(s, scd, my_pn, ltm_pn)
  405. # cf = Conformance(s, scd, ltm_pn, scd)
  406. # cf.check_nominal()
  407. cf.precompute_structures()
  408. cf.match_structures()