conformance.py 22 KB


  1. from services.bottom.V0 import Bottom
  2. from uuid import UUID
  3. from state.base import State
  4. from typing import Dict, Tuple, Set, Any, List
  5. from pprint import pprint
  6. class Conformance:
  7. def __init__(self, state: State, scd_model: UUID, model: UUID, type_model: UUID):
  8. self.state = state
  9. self.bottom = Bottom(state)
  10. self.scd_model = scd_model
  11. self.model = model
  12. self.type_model = type_model
  13. self.type_mapping: Dict[str, str] = {}
  14. self.model_names = {
  15. # map model elements to their names to prevent iterating too much
  16. self.bottom.read_outgoing_elements(self.model, e)[0]: e
  17. for e in self.bottom.read_keys(self.model)
  18. }
  19. self.type_model_names = {
  20. # map type model elements to their names to prevent iterating too much
  21. self.bottom.read_outgoing_elements(self.type_model, e)[0]: e
  22. for e in self.bottom.read_keys(self.type_model)
  23. }
  24. self.sub_types: Dict[str, Set[str]] = {
  25. k: set() for k in self.bottom.read_keys(self.type_model)
  26. }
  27. self.primitive_values: Dict[UUID, Any] = {}
  28. self.abstract_types: List[str] = []
  29. self.multiplicities: Dict[str, Tuple] = {}
  30. self.source_multiplicities: Dict[str, Tuple] = {}
  31. self.target_multiplicities: Dict[str, Tuple] = {}
  32. self.structures = {}
  33. self.matches = {}
  34. self.candidates = {}
  35. def check_nominal(self):
  36. steps = [
  37. self.check_typing,
  38. self.check_link_typing,
  39. self.check_multiplicities,
  40. self.check_constraints
  41. ]
  42. for step in steps:
  43. conforms = step()
  44. if not conforms:
  45. return False
  46. return True
  47. def read_attribute(self, element: UUID, attr_name: str):
  48. if element in self.type_model_names:
  49. # type model element
  50. element_name = self.type_model_names[element]
  51. model = self.type_model
  52. else:
  53. # model element
  54. element_name = self.model_names[element]
  55. model = self.model
  56. try:
  57. attr_elem, = self.bottom.read_outgoing_elements(model, f"{element_name}.{attr_name}")
  58. return self.primitive_values.get(attr_elem, self.bottom.read_value(attr_elem))
  59. except ValueError:
  60. return None
  61. def precompute_sub_types(self):
  62. inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
  63. inh_links = []
  64. for tm_element, tm_name in self.type_model_names.items():
  65. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  66. if inh_element in morphisms:
  67. inh_links.append(tm_element)
  68. for link in inh_links:
  69. tm_source = self.bottom.read_edge_source(link)
  70. tm_target = self.bottom.read_edge_target(link)
  71. parent_name = self.type_model_names[tm_target]
  72. child_name = self.type_model_names[tm_source]
  73. self.sub_types[parent_name].add(child_name)
  74. stop = False
  75. while not stop:
  76. stop = True
  77. for child_name, child_children in self.sub_types.items():
  78. for parent_name, parent_children in self.sub_types.items():
  79. if child_name in parent_children:
  80. original_size = len(parent_children)
  81. parent_children.update(child_children)
  82. if len(parent_children) != original_size:
  83. stop = False
  84. def deref_primitive_values(self):
  85. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  86. string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
  87. boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
  88. integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
  89. t_deref = []
  90. t_refs = []
  91. for tm_element, tm_name in self.type_model_names.items():
  92. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  93. if ref_element in morphisms:
  94. t_refs.append(self.type_model_names[tm_element])
  95. elif string_element in morphisms:
  96. t_deref.append(tm_element)
  97. elif boolean_element in morphisms:
  98. t_deref.append(tm_element)
  99. elif integer_element in morphisms:
  100. t_deref.append(tm_element)
  101. for elem in t_deref:
  102. primitive_model = UUID(self.bottom.read_value(elem))
  103. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  104. primitive_value = self.bottom.read_value(primitive_value_node)
  105. self.primitive_values[elem] = primitive_value
  106. for m_name, tm_name in self.type_mapping.items():
  107. if tm_name in t_refs:
  108. # dereference
  109. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  110. primitive_model = UUID(self.bottom.read_value(m_element))
  111. try:
  112. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  113. primitive_value = self.bottom.read_value(primitive_value_node)
  114. self.primitive_values[m_element] = primitive_value
  115. except ValueError:
  116. pass # multiple elements in model indicate that we're not dealing with a primitive
  117. def precompute_multiplicities(self):
  118. for tm_element, tm_name in self.type_model_names.items():
  119. # class abstract flags and multiplicities
  120. abstract = self.read_attribute(tm_element, "abstract")
  121. lc = self.read_attribute(tm_element, "lower_cardinality")
  122. uc = self.read_attribute(tm_element, "upper_cardinality")
  123. if abstract:
  124. self.abstract_types.append(tm_name)
  125. if lc or uc:
  126. mult = (
  127. lc if lc is not None else float("-inf"),
  128. uc if uc is not None else float("inf")
  129. )
  130. self.multiplicities[tm_name] = mult
  131. # multiplicities for associations
  132. slc = self.read_attribute(tm_element, "source_lower_cardinality")
  133. suc = self.read_attribute(tm_element, "source_upper_cardinality")
  134. if slc or suc:
  135. mult = (
  136. slc if slc is not None else float("-inf"),
  137. suc if suc is not None else float("inf")
  138. )
  139. self.source_multiplicities[tm_name] = mult
  140. tlc = self.read_attribute(tm_element, "target_lower_cardinality")
  141. tuc = self.read_attribute(tm_element, "target_upper_cardinality")
  142. if tlc or tuc:
  143. mult = (
  144. tlc if tlc is not None else float("-inf"),
  145. tuc if tuc is not None else float("inf")
  146. )
  147. self.target_multiplicities[tm_name] = mult
  148. # optional for attribute links
  149. opt = self.read_attribute(tm_element, "optional")
  150. if opt is not None:
  151. mult = (0 if opt else 1, 1)
  152. self.source_multiplicities[tm_name] = mult
  153. self.target_multiplicities[tm_name] = mult
  154. def get_type(self, element: UUID):
  155. morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
  156. tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
  157. return tm_element
  158. def check_typing(self):
  159. """
  160. for each element of model check whether a morphism
  161. link exists to some element of type_model
  162. """
  163. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  164. model_names = self.bottom.read_keys(self.model)
  165. for m_name in model_names:
  166. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  167. try:
  168. tm_element = self.get_type(m_element)
  169. tm_name = self.type_model_names[tm_element]
  170. self.type_mapping[m_name] = tm_name
  171. if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"):
  172. sub_m = UUID(self.bottom.read_value(m_element))
  173. sub_tm = UUID(self.bottom.read_value(tm_element))
  174. if not Conformance(self.state, self.scd_model, sub_m, sub_tm).check_nominal():
  175. return False
  176. except ValueError:
  177. # no or too many morphism links found
  178. print(f"Incorrectly typed element: {m_name}")
  179. return False
  180. return True
  181. def check_link_typing(self):
  182. self.precompute_sub_types()
  183. for m_name, tm_name in self.type_mapping.items():
  184. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  185. m_source = self.bottom.read_edge_source(m_element)
  186. m_target = self.bottom.read_edge_target(m_element)
  187. if m_source is None or m_target is None:
  188. # element is not a link
  189. continue
  190. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  191. tm_source = self.bottom.read_edge_source(tm_element)
  192. tm_target = self.bottom.read_edge_target(tm_element)
  193. # check if source is typed correctly
  194. source_name = self.model_names[m_source]
  195. source_type_actual = self.type_mapping[source_name]
  196. source_type_expected = self.type_model_names[tm_source]
  197. if source_type_actual != source_type_expected:
  198. if source_type_actual not in self.sub_types[source_type_expected]:
  199. print(f"Invalid source type {source_type_actual} for element {m_name}")
  200. return False
  201. # check if target is typed correctly
  202. target_name = self.model_names[m_target]
  203. target_type_actual = self.type_mapping[target_name]
  204. target_type_expected = self.type_model_names[tm_target]
  205. if target_type_actual != target_type_expected:
  206. if target_type_actual not in self.sub_types[target_type_expected]:
  207. print(f"Invalid target type {target_type_actual} for element {m_name}")
  208. return False
  209. return True
  210. def check_multiplicities(self):
  211. self.deref_primitive_values()
  212. self.precompute_multiplicities()
  213. for tm_name in self.type_model_names.values():
  214. # abstract classes
  215. if tm_name in self.abstract_types:
  216. type_count = list(self.type_mapping.values()).count(tm_name)
  217. if type_count > 0:
  218. print(f"Invalid instantiation of abstract class: {tm_name}")
  219. return False
  220. # class multiplicities
  221. if tm_name in self.multiplicities:
  222. lc, uc = self.multiplicities[tm_name]
  223. type_count = list(self.type_mapping.values()).count(tm_name)
  224. for sub_type in self.sub_types[tm_name]:
  225. type_count += list(self.type_mapping.values()).count(sub_type)
  226. if type_count < lc or type_count > uc:
  227. print(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})")
  228. return False
  229. # association source multiplicities
  230. if tm_name in self.source_multiplicities:
  231. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  232. tm_source_element = self.bottom.read_edge_source(tm_element)
  233. tm_source_name = self.type_model_names[tm_source_element]
  234. lc, uc = self.source_multiplicities[tm_name]
  235. for i, t in self.type_mapping.items():
  236. if t == tm_source_name or t in self.sub_types[tm_source_name]:
  237. count = 0
  238. i_element, = self.bottom.read_outgoing_elements(self.model, i)
  239. outgoing = self.bottom.read_outgoing_edges(i_element)
  240. for o in outgoing:
  241. try:
  242. if self.type_mapping[self.model_names[o]] == tm_name:
  243. count += 1
  244. except KeyError:
  245. pass # for elements not part of model, e.g. morphism links
  246. if count < lc or count > uc:
  247. print(f"Source cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
  248. return False
  249. # association target multiplicities
  250. if tm_name in self.target_multiplicities:
  251. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  252. tm_target_element = self.bottom.read_edge_source(tm_element)
  253. tm_target_name = self.type_model_names[tm_target_element]
  254. lc, uc = self.target_multiplicities[tm_name]
  255. for i, t in self.type_mapping.items():
  256. if t == tm_target_name or t in self.sub_types[tm_target_name]:
  257. count = 0
  258. i_element, = self.bottom.read_outgoing_elements(self.model, i)
  259. outgoing = self.bottom.read_outgoing_edges(i_element)
  260. for o in outgoing:
  261. try:
  262. if self.type_mapping[self.model_names[o]] == tm_name:
  263. count += 1
  264. except KeyError:
  265. pass # for elements not part of model, e.g. morphism links
  266. if count < lc or count > uc:
  267. print(f"Target cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
  268. return False
  269. return True
  270. def evaluate_constraint(self, code, **kwargs):
  271. funcs = {
  272. 'read_value': self.state.read_value
  273. }
  274. return eval(
  275. code,
  276. {'__builtins__': {'isinstance': isinstance, 'print': print,
  277. 'int': int, 'float': float, 'bool': bool, 'str': str, 'tuple': tuple}
  278. }, # globals
  279. {**kwargs, **funcs} # locals
  280. )
  281. def check_constraints(self):
  282. # local constraints
  283. for m_name, tm_name in self.type_mapping.items():
  284. if tm_name != "GlobalConstraint":
  285. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  286. code = self.read_attribute(tm_element, "constraint")
  287. if code is not None:
  288. morphisms = self.bottom.read_incoming_elements(tm_element, "Morphism")
  289. morphisms = [m for m in morphisms if m in self.model_names]
  290. for m_element in morphisms:
  291. if not self.evaluate_constraint(code, element=m_element):
  292. return False
  293. # global constraints
  294. for m_name, tm_name in self.type_mapping.items():
  295. if tm_name == "GlobalConstraint":
  296. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  297. code = self.read_attribute(tm_element, "constraint")
  298. if code is not None:
  299. if not self.evaluate_constraint(code, model=self.model):
  300. return False
  301. return True
  302. def precompute_structures(self):
  303. self.precompute_sub_types()
  304. scd_elements = self.bottom.read_outgoing_elements(self.scd_model)
  305. # collect types
  306. class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class")
  307. association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association")
  308. for tm_element, tm_name in self.type_model_names.items():
  309. # retrieve elements that tm_element is a morphism of
  310. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  311. morphism, = [m for m in morphisms if m in scd_elements]
  312. # check if tm_element is a morphism of AttributeLink
  313. if class_element == morphism or association_element == morphism:
  314. self.structures[tm_name] = set()
  315. # collect type structures
  316. # retrieve AttributeLink to check whether element is a morphism of AttributeLink
  317. attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink")
  318. for tm_element, tm_name in self.type_model_names.items():
  319. # retrieve elements that tm_element is a morphism of
  320. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  321. morphism, = [m for m in morphisms if m in scd_elements]
  322. # check if tm_element is a morphism of AttributeLink
  323. if attr_link_element == morphism:
  324. # retrieve attributes of attribute link, i.e. 'name' and 'optional'
  325. attrs = self.bottom.read_outgoing_elements(tm_element)
  326. name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs)
  327. opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs)
  328. # get attr name value
  329. name_model = UUID(self.bottom.read_value(name_model_node))
  330. name_node, = self.bottom.read_outgoing_elements(name_model)
  331. name = self.bottom.read_value(name_node)
  332. # get attr opt value
  333. opt_model = UUID(self.bottom.read_value(opt_model_node))
  334. opt_node, = self.bottom.read_outgoing_elements(opt_model)
  335. opt = self.bottom.read_value(opt_node)
  336. # get attr type name
  337. source_type_node = self.bottom.read_edge_source(tm_element)
  338. source_type_name = self.type_model_names[source_type_node]
  339. target_type_node = self.bottom.read_edge_target(tm_element)
  340. target_type_name = self.type_model_names[target_type_node]
  341. # add attribute to the structure of its source type
  342. # attribute is stored as a (name, optional, type) triple
  343. self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name))
  344. # extend structures of sub types with attrs of super types
  345. for super_type, sub_types in self.sub_types.items():
  346. for sub_type in sub_types:
  347. self.structures.setdefault(sub_type, set()).update(self.structures[super_type])
  348. # filter out abstract types, as they cannot be instantiated
  349. # retrieve Class_abstract to check whether element is a morphism of Class_abstract
  350. class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract")
  351. for tm_element, tm_name in self.type_model_names.items():
  352. # retrieve elements that tm_element is a morphism of
  353. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  354. morphism, = [m for m in morphisms if m in scd_elements]
  355. # check if tm_element is a morphism of Class_abstract
  356. if class_abs_element == morphism:
  357. # retrieve 'abstract' attribute value
  358. target_node = self.bottom.read_edge_target(tm_element)
  359. abst_model = UUID(self.bottom.read_value(target_node))
  360. abst_node, = self.bottom.read_outgoing_elements(abst_model)
  361. is_abstract = self.bottom.read_value(abst_node)
  362. # retrieve type name
  363. source_node = self.bottom.read_edge_source(tm_element)
  364. type_name = self.type_model_names[source_node]
  365. if is_abstract:
  366. self.structures.pop(type_name)
  367. def match_structures(self):
  368. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  369. # match nodes
  370. for m_element, m_name in self.model_names.items():
  371. self.candidates[m_name] = set()
  372. is_edge = self.bottom.read_edge_source(m_element) is not None
  373. for type_name, structure in self.structures.items():
  374. tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name)
  375. type_is_edge = self.bottom.read_edge_source(tm_element) is not None
  376. if is_edge == type_is_edge:
  377. matched = []
  378. for name, optional, attr_type in structure:
  379. try:
  380. attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}")
  381. # if attribute is a modelref, we need to check whether it
  382. # linguistically conforms to the specified type
  383. # if its an internlly defined attribute, this will be checked by constraints later
  384. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  385. if ref_element in morphisms:
  386. pass
  387. except ValueError:
  388. if optional:
  389. continue
  390. else:
  391. break
  392. if len(matched) == len(structure):
  393. self.candidates[m_name].add(type_name)
  394. def build_morphisms(self):
  395. pass
  396. if __name__ == '__main__':
  397. from state.devstate import DevState as State
  398. s = State()
  399. from bootstrap.scd import bootstrap_scd
  400. scd = bootstrap_scd(s)
  401. from bootstrap.pn import bootstrap_pn
  402. ltm_pn = bootstrap_pn(s, "PN")
  403. from services.pn import PN
  404. my_pn = s.create_node()
  405. PNserv = PN(ltm_pn, my_pn, s)
  406. PNserv.create_place("p1", 5)
  407. PNserv.create_place("p2", 0)
  408. PNserv.create_transition("t1")
  409. PNserv.create_p2t("p1", "t1", 1)
  410. PNserv.create_t2p("t1", "p2", 1)
  411. cf = Conformance(s, scd, my_pn, ltm_pn)
  412. # cf = Conformance(s, scd, ltm_pn, scd)
  413. print(cf.check_nominal())
  414. # cf.precompute_structures()
  415. # cf.match_structures()