conformance.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. from services.bottom.V0 import Bottom
  2. from uuid import UUID
  3. from state.base import State
  4. from typing import Dict, Tuple, Set, Any, List
  5. from pprint import pprint
  6. class Conformance:
  7. def __init__(self, state: State, scd_model: UUID, model: UUID, type_model: UUID):
  8. self.state = state
  9. self.bottom = Bottom(state)
  10. self.scd_model = scd_model
  11. self.model = model
  12. self.type_model = type_model
  13. self.type_mapping: Dict[str, str] = {}
  14. self.model_names = {
  15. # map model elements to their names to prevent iterating too much
  16. self.bottom.read_outgoing_elements(self.model, e)[0]: e
  17. for e in self.bottom.read_keys(self.model)
  18. }
  19. self.type_model_names = {
  20. # map type model elements to their names to prevent iterating too much
  21. self.bottom.read_outgoing_elements(self.type_model, e)[0]: e
  22. for e in self.bottom.read_keys(self.type_model)
  23. }
  24. self.sub_types: Dict[str, Set[str]] = {
  25. k: set() for k in self.bottom.read_keys(self.type_model)
  26. }
  27. self.primitive_values: Dict[UUID, Any] = {}
  28. self.abstract_types: List[str] = []
  29. self.multiplicities: Dict[str, Tuple] = {}
  30. self.source_multiplicities: Dict[str, Tuple] = {}
  31. self.target_multiplicities: Dict[str, Tuple] = {}
  32. def check_nominal(self):
  33. steps = [
  34. self.check_typing,
  35. self.check_link_typing,
  36. self.check_multiplicities,
  37. self.check_constraints
  38. ]
  39. for step in steps:
  40. conforms = step()
  41. if not conforms:
  42. return False
  43. return True
  44. def read_attribute(self, m_element: UUID, attr_name: str):
  45. def has_label(_edge: UUID, _label):
  46. elems = self.bottom.read_outgoing_elements(_edge)
  47. for elem in elems:
  48. value = self.primitive_values.get(elem, self.bottom.read_value(elem))
  49. if value is not None and value == _label:
  50. return True
  51. return False
  52. def get_outgoing_edge_by_label(_element: UUID, _label):
  53. edges = self.bottom.read_outgoing_edges(_element)
  54. for e in edges:
  55. if has_label(e, _label):
  56. return e
  57. outgoing = self.bottom.read_outgoing_edges(m_element)
  58. for edge in outgoing:
  59. try:
  60. edge_name = self.model_names[edge]
  61. edge_type_name = self.type_mapping[edge_name]
  62. edge_type, = self.bottom.read_outgoing_elements(self.type_model, edge_type_name)
  63. edge_type_src = self.bottom.read_edge_source(edge_type)
  64. if get_outgoing_edge_by_label(edge_type_src, attr_name) == edge_type:
  65. result = self.bottom.read_edge_target(edge)
  66. return self.primitive_values.get(result, self.bottom.read_value(result))
  67. except KeyError:
  68. pass # non-model edge, e.g. morphism link
  69. def precompute_sub_types(self):
  70. inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
  71. inh_links = []
  72. for tm_element, tm_name in self.type_model_names.items():
  73. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  74. if inh_element in morphisms:
  75. inh_links.append(tm_element)
  76. for link in inh_links:
  77. tm_source = self.bottom.read_edge_source(link)
  78. tm_target = self.bottom.read_edge_target(link)
  79. parent_name = self.type_model_names[tm_target]
  80. child_name = self.type_model_names[tm_source]
  81. self.sub_types[parent_name].add(child_name)
  82. stop = False
  83. while not stop:
  84. stop = True
  85. for child_name, child_children in self.sub_types.items():
  86. for parent_name, parent_children in self.sub_types.items():
  87. if child_name in parent_children:
  88. original_size = len(parent_children)
  89. parent_children.update(child_children)
  90. if len(parent_children) != original_size:
  91. stop = False
  92. def deref_primitive_values(self):
  93. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  94. string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
  95. boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
  96. integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
  97. t_deref = []
  98. t_refs = []
  99. for tm_element, tm_name in self.type_model_names.items():
  100. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  101. if ref_element in morphisms:
  102. t_refs.append(self.type_model_names[tm_element])
  103. elif string_element in morphisms:
  104. t_deref.append(tm_element)
  105. elif boolean_element in morphisms:
  106. t_deref.append(tm_element)
  107. elif integer_element in morphisms:
  108. t_deref.append(tm_element)
  109. for elem in t_deref:
  110. primitive_model = UUID(self.bottom.read_value(elem))
  111. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  112. primitive_value = self.bottom.read_value(primitive_value_node)
  113. self.primitive_values[elem] = primitive_value
  114. for m_name, tm_name in self.type_mapping.items():
  115. if tm_name in t_refs:
  116. # dereference
  117. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  118. primitive_model = UUID(self.bottom.read_value(m_element))
  119. try:
  120. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  121. primitive_value = self.bottom.read_value(primitive_value_node)
  122. self.primitive_values[m_element] = primitive_value
  123. except ValueError:
  124. pass # multiple elements in model indicate that we're not dealing with a primitive
  125. def precompute_multiplicities(self):
  126. for tm_element, tm_name in self.type_model_names.items():
  127. # class abstract flags and multiplicities
  128. abstract = self.read_attribute(tm_element, "abstract")
  129. lc = self.read_attribute(tm_element, "lower_cardinality")
  130. uc = self.read_attribute(tm_element, "upper_cardinality")
  131. if abstract:
  132. self.abstract_types.append(tm_name)
  133. if lc or uc:
  134. mult = (
  135. lc if lc is not None else float("-inf"),
  136. uc if uc is not None else float("inf")
  137. )
  138. self.multiplicities[tm_name] = mult
  139. # multiplicities for associations
  140. slc = self.read_attribute(tm_element, "source_lower_cardinality")
  141. suc = self.read_attribute(tm_element, "source_upper_cardinality")
  142. if slc or suc:
  143. mult = (
  144. slc if slc is not None else float("-inf"),
  145. suc if suc is not None else float("inf")
  146. )
  147. self.source_multiplicities[tm_name] = mult
  148. tlc = self.read_attribute(tm_element, "target_lower_cardinality")
  149. tuc = self.read_attribute(tm_element, "target_upper_cardinality")
  150. if tlc or tuc:
  151. mult = (
  152. tlc if tlc is not None else float("-inf"),
  153. tuc if tuc is not None else float("inf")
  154. )
  155. self.target_multiplicities[tm_name] = mult
  156. # optional for attribute links
  157. opt = self.read_attribute(tm_element, "optional")
  158. if opt is not None:
  159. mult = (0 if opt else 1, 1)
  160. self.source_multiplicities[tm_name] = mult
  161. self.target_multiplicities[tm_name] = mult
  162. def get_type(self, element: UUID):
  163. morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
  164. tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
  165. return tm_element
  166. def check_typing(self):
  167. """
  168. for each element of model check whether a morphism
  169. link exists to some element of type_model
  170. """
  171. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  172. model_names = self.bottom.read_keys(self.model)
  173. for m_name in model_names:
  174. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  175. try:
  176. tm_element = self.get_type(m_element)
  177. tm_name = self.type_model_names[tm_element]
  178. self.type_mapping[m_name] = tm_name
  179. if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"):
  180. sub_m = UUID(self.bottom.read_value(m_element))
  181. sub_tm = UUID(self.bottom.read_value(tm_element))
  182. if not Conformance(self.state, self.scd_model, sub_m, sub_tm).check_nominal():
  183. return False
  184. except ValueError:
  185. # no or too many morphism links found
  186. print(f"Incorrectly typed element: {m_name}")
  187. return False
  188. return True
  189. def check_link_typing(self):
  190. self.precompute_sub_types()
  191. for m_name, tm_name in self.type_mapping.items():
  192. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  193. m_source = self.bottom.read_edge_source(m_element)
  194. m_target = self.bottom.read_edge_target(m_element)
  195. if m_source is None or m_target is None:
  196. # element is not a link
  197. continue
  198. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  199. tm_source = self.bottom.read_edge_source(tm_element)
  200. tm_target = self.bottom.read_edge_target(tm_element)
  201. # check if source is typed correctly
  202. source_name = self.model_names[m_source]
  203. source_type_actual = self.type_mapping[source_name]
  204. source_type_expected = self.type_model_names[tm_source]
  205. if source_type_actual != source_type_expected:
  206. if source_type_actual not in self.sub_types[source_type_expected]:
  207. print(f"Invalid source type {source_type_actual} for element {m_name}")
  208. return False
  209. # check if target is typed correctly
  210. target_name = self.model_names[m_target]
  211. target_type_actual = self.type_mapping[target_name]
  212. target_type_expected = self.type_model_names[tm_target]
  213. if target_type_actual != target_type_expected:
  214. if target_type_actual not in self.sub_types[target_type_expected]:
  215. print(f"Invalid target type {target_type_actual} for element {m_name}")
  216. return False
  217. return True
  218. def check_multiplicities(self):
  219. self.deref_primitive_values()
  220. self.precompute_multiplicities()
  221. for tm_name in self.type_model_names.values():
  222. # abstract classes
  223. if tm_name in self.abstract_types:
  224. type_count = list(self.type_mapping.values()).count(tm_name)
  225. if type_count > 0:
  226. print(f"Invalid instantiation of abstract class: {tm_name}")
  227. return False
  228. # class multiplicities
  229. if tm_name in self.multiplicities:
  230. lc, uc = self.multiplicities[tm_name]
  231. type_count = list(self.type_mapping.values()).count(tm_name)
  232. for sub_type in self.sub_types[tm_name]:
  233. type_count += list(self.type_mapping.values()).count(sub_type)
  234. if type_count < lc or type_count > uc:
  235. print(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})")
  236. return False
  237. # association source multiplicities
  238. if tm_name in self.source_multiplicities:
  239. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  240. tm_source_element = self.bottom.read_edge_source(tm_element)
  241. tm_source_name = self.type_model_names[tm_source_element]
  242. lc, uc = self.source_multiplicities[tm_name]
  243. for i, t in self.type_mapping.items():
  244. if t == tm_source_name or t in self.sub_types[tm_source_name]:
  245. count = 0
  246. i_element, = self.bottom.read_outgoing_elements(self.model, i)
  247. outgoing = self.bottom.read_outgoing_edges(i_element)
  248. for o in outgoing:
  249. try:
  250. if self.type_mapping[self.model_names[o]] == tm_name:
  251. count += 1
  252. except KeyError:
  253. pass # for elements not part of model, e.g. morphism links
  254. if count < lc or count > uc:
  255. print(f"Source cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
  256. return False
  257. # association target multiplicities
  258. if tm_name in self.target_multiplicities:
  259. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  260. tm_target_element = self.bottom.read_edge_source(tm_element)
  261. tm_target_name = self.type_model_names[tm_target_element]
  262. lc, uc = self.target_multiplicities[tm_name]
  263. for i, t in self.type_mapping.items():
  264. if t == tm_target_name or t in self.sub_types[tm_target_name]:
  265. count = 0
  266. i_element, = self.bottom.read_outgoing_elements(self.model, i)
  267. outgoing = self.bottom.read_outgoing_edges(i_element)
  268. for o in outgoing:
  269. try:
  270. if self.type_mapping[self.model_names[o]] == tm_name:
  271. count += 1
  272. except KeyError:
  273. pass # for elements not part of model, e.g. morphism links
  274. if count < lc or count > uc:
  275. print(f"Target cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
  276. return False
  277. return True
  278. def check_constraints(self):
  279. # local constraints
  280. for m_name, tm_name in self.type_mapping.items():
  281. if tm_name != "GlobalConstraint":
  282. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  283. code = self.read_attribute(tm_element, "constraint")
  284. print(code)
  285. # global constraints
  286. for m_name, tm_name in self.type_mapping.items():
  287. if tm_name == "GlobalConstraint":
  288. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  289. code = self.read_attribute(tm_element, "constraint")
  290. print(code)
  291. return True
  292. def __create_pn(state: State):
  293. from services.scd import SCD
  294. # Retrieve refs to primitive type models
  295. # # integer
  296. int_type_id = state.read_dict(state.read_root(), "Integer")
  297. int_type = UUID(state.read_value(int_type_id))
  298. # # string
  299. str_type_id = state.read_dict(state.read_root(), "String")
  300. str_type = UUID(state.read_value(str_type_id))
  301. # Create LTM_PN
  302. model_uuid = state.create_node()
  303. service = SCD(scd, model_uuid, state)
  304. # Create classes
  305. service.create_class("P")
  306. service.create_class("T")
  307. # Create associations
  308. service.create_association("P2T", "P", "T")
  309. service.create_association("T2P", "T", "P")
  310. # Create model refs
  311. service.create_model_ref("Integer", int_type)
  312. service.create_model_ref("String", int_type)
  313. # Create class attributes
  314. service.create_attribute_link("P", "Integer", "t", False)
  315. service.create_attribute_link("P", "String", "n", False)
  316. service.create_attribute_link("T", "String", "n", False)
  317. # Create association attributes
  318. service.create_attribute_link("P2T", "Integer", "w", False)
  319. service.create_attribute_link("T2P", "Integer", "w", False)
  320. # Create test constraint
  321. service.add_constraint("P", "print(element)\nreturn True")
  322. return model_uuid
  323. if __name__ == '__main__':
  324. from state.devstate import DevState as State
  325. s = State()
  326. from bootstrap.scd import bootstrap_scd
  327. scd = bootstrap_scd(s)
  328. pn = __create_pn(s)
  329. # cf = Conformance(s, scd, scd, scd)
  330. # cf.check_nominal()
  331. cf = Conformance(s, scd, pn, scd)
  332. cf.check_nominal()