scd.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. from uuid import UUID
  2. from state.base import State
  3. from services.bottom.V0 import Bottom
  4. from services.primitives.boolean_type import Boolean
  5. from services.primitives.integer_type import Integer
  6. from services.primitives.string_type import String
  7. import re
  8. class SCD:
  9. """
  10. Implements services for the simple class diagrams LTM.
  11. Implementation is done in terms of services provided by LTM-bottom
  12. """
  13. def __init__(self, model: UUID, state: State):
  14. type_model_id = state.read_dict(state.read_root(), "SCD")
  15. self.scd_model = UUID(state.read_value(type_model_id))
  16. self.model = model
  17. self.bottom = Bottom(state)
  18. def create_class(self, name: str, abstract: bool = None, min_c: int = None, max_c: int = None):
  19. """
  20. Creates an instance of a class.
  21. Args:
  22. name: the name of the class to be created
  23. abstract: indicates whether or not the class is an abstract class
  24. min_c: lower bound for class multicplicity
  25. max_c: upper bound for class multicplicity
  26. Returns:
  27. Nothing.
  28. """
  29. def set_cardinality(bound: str, value: int):
  30. """ Helper for setting cardinality attributes """
  31. # Create cardinality attribute root node
  32. # Do note that this is an instance of a ModelRef!
  33. _c_model = self.bottom.create_node()
  34. Integer(_c_model, self.bottom.state).create(value)
  35. _c_node = self.bottom.create_node(str(_c_model)) # store UUID of primitive value model
  36. self.bottom.create_edge(self.model, _c_node, f"{name}.{bound}_cardinality") # link to model root
  37. _c_link = self.bottom.create_edge(class_node, _c_node) # link class to attribute
  38. self.bottom.create_edge(self.model, _c_link, f"{name}.{bound}_cardinality_link") # link attr link to model root
  39. # retrieve types from metamodel
  40. _scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
  41. _scd_link, = self.bottom.read_outgoing_elements(self.scd_model, f"Class_{bound}_cardinality")
  42. # type newly created elements
  43. self.bottom.create_edge(_c_node, _scd_node, "Morphism")
  44. self.bottom.create_edge(_c_link, _scd_link, "Morphism")
  45. # create class + attributes + morphism links
  46. class_node = self.bottom.create_node() # create class node
  47. self.bottom.create_edge(self.model, class_node, name) # attach to model
  48. scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Class") # retrieve type
  49. self.bottom.create_edge(class_node, scd_node, "Morphism") # create morphism link
  50. if abstract is not None:
  51. # operations similar to set_cardinality function defined above
  52. abstract_model = self.bottom.create_node()
  53. Boolean(abstract_model, self.bottom.state).create(abstract)
  54. abstract_node = self.bottom.create_node(str(abstract_model))
  55. self.bottom.create_edge(self.model, abstract_node, f"{name}.abstract")
  56. abstract_link = self.bottom.create_edge(class_node, abstract_node)
  57. self.bottom.create_edge(self.model, abstract_link, f"{name}.abstract_link")
  58. scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
  59. scd_link, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract")
  60. self.bottom.create_edge(abstract_node, scd_node, "Morphism")
  61. self.bottom.create_edge(abstract_link, scd_link, "Morphism")
  62. if min_c is not None:
  63. set_cardinality("lower", min_c)
  64. if max_c is not None:
  65. set_cardinality("upper", max_c)
  66. def create_association(self, name: str, source: str, target: str,
  67. src_min_c: int = None, src_max_c: int = None,
  68. tgt_min_c: int = None, tgt_max_c: int = None):
  69. """
  70. Creates an instance of an association.
  71. Args:
  72. name: the name of the association to be created
  73. source: the name of the source of the association
  74. target: the name of the target of the association
  75. src_min_c: lower bound for source multicplicity
  76. src_max_c: upper bound for source multicplicity
  77. tgt_min_c: lower bound for target multicplicity
  78. tgt_max_c: upper bound for target multicplicity
  79. Returns:
  80. Nothing.
  81. """
  82. src, = self.bottom.read_outgoing_elements(self.model, source)
  83. tgt, = self.bottom.read_outgoing_elements(self.model, target)
  84. return self._create_association(name, src, tgt,
  85. src_min_c, src_max_c,
  86. tgt_min_c, tgt_max_c)
  87. def _create_association(self, name: str, source: UUID, target: UUID,
  88. src_min_c: int = None, src_max_c: int = None,
  89. tgt_min_c: int = None, tgt_max_c: int = None):
  90. def set_cardinality(bound: str, value: int):
  91. # similar to set_cardinality function defined in create_class
  92. _c_model = self.bottom.create_node()
  93. Integer(_c_model, self.bottom.state).create(value)
  94. _c_node = self.bottom.create_node(str(_c_model))
  95. self.bottom.create_edge(self.model, _c_node, f"{name}.{bound}_cardinality")
  96. _c_link = self.bottom.create_edge(assoc_edge, _c_node)
  97. self.bottom.create_edge(self.model, _c_link, f"{name}.{bound}_cardinality_link")
  98. _scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
  99. _scd_link, = self.bottom.read_outgoing_elements(self.scd_model, f"Association_{bound}_cardinality")
  100. self.bottom.create_edge(_c_node, _scd_node, "Morphism")
  101. self.bottom.create_edge(_c_link, _scd_link, "Morphism")
  102. # create association + attributes + morphism links
  103. assoc_edge = self.bottom.create_edge(source, target) # create assoc edge
  104. self.bottom.create_edge(self.model, assoc_edge, name) # attach to model
  105. scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Association") # retrieve type
  106. self.bottom.create_edge(assoc_edge, scd_node, "Morphism") # create morphism link
  107. if src_min_c is not None:
  108. set_cardinality("source_lower", src_min_c)
  109. if src_max_c is not None:
  110. set_cardinality("source_upper", src_max_c)
  111. if tgt_min_c is not None:
  112. set_cardinality("target_lower", tgt_min_c)
  113. if tgt_max_c is not None:
  114. set_cardinality("target_upper", tgt_max_c)
  115. def create_global_constraint(self, name: str):
  116. """
  117. Defines a global constraint element.
  118. Args:
  119. name: the name of the global constraint to be created
  120. Returns:
  121. Nothing.
  122. """
  123. # create element + morphism links
  124. element_node = self.bottom.create_node() # create element node
  125. self.bottom.create_edge(self.model, element_node, name) # attach to model
  126. scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "GlobalConstraint") # retrieve type
  127. self.bottom.create_edge(element_node, scd_node, "Morphism") # create morphism link
  128. def create_attribute(self, name: str):
  129. """
  130. Defines an attribute element.
  131. Args:
  132. name: the name of the attribute to be created
  133. Returns:
  134. Nothing.
  135. """
  136. # create element + morphism links
  137. element_node = self.bottom.create_node() # create element node
  138. self.bottom.create_edge(self.model, element_node, name) # attach to model
  139. scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Attribute") # retrieve type
  140. self.bottom.create_edge(element_node, scd_node, "Morphism") # create morphism link
  141. def create_attribute_link(self, source: str, target: str, name: str, optional: bool):
  142. """
  143. Defines an attribute link element.
  144. Args:
  145. source: element attribute will be attached to
  146. target: attribute element
  147. name: name of the attribute
  148. optional: indicates whether attribute is optional
  149. Returns:
  150. Nothing.
  151. """
  152. tgt, = self.bottom.read_outgoing_elements(self.model, target)
  153. return self._create_attribute_link(source, tgt, name, optional)
  154. def _create_attribute_link(self, source: str, target: UUID, name: str, optional: bool):
  155. # create attribute link + morphism links
  156. src, = self.bottom.read_outgoing_elements(self.model, source)
  157. assoc_edge = self.bottom.create_edge(src, target) # create v edge
  158. self.bottom.create_edge(self.model, assoc_edge, f"{source}_{name}") # attach to model
  159. scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink") # retrieve type
  160. self.bottom.create_edge(assoc_edge, scd_node, "Morphism") # create morphism link
  161. # name attribute
  162. # Do note that this is an instance of a ModelRef!
  163. name_model = self.bottom.create_node()
  164. String(name_model, self.bottom.state).create(name)
  165. name_node = self.bottom.create_node(str(name_model))
  166. self.bottom.create_edge(self.model, name_node, f"{source}_{name}.name")
  167. name_link = self.bottom.create_edge(assoc_edge, name_node)
  168. self.bottom.create_edge(self.model, name_link, f"{source}_{name}.name_link")
  169. scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "String")
  170. scd_link, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink_name")
  171. self.bottom.create_edge(name_node, scd_node, "Morphism")
  172. self.bottom.create_edge(name_link, scd_link, "Morphism")
  173. # optional attribute
  174. # Do note that this is an instance of a ModelRef!
  175. optional_model = self.bottom.create_node()
  176. Boolean(optional_model, self.bottom.state).create(optional)
  177. optional_node = self.bottom.create_node(str(optional_model))
  178. self.bottom.create_edge(self.model, optional_node, f"{source}_{name}.optional")
  179. optional_link = self.bottom.create_edge(assoc_edge, optional_node)
  180. self.bottom.create_edge(self.model, optional_link, f"{source}_{name}.optional_link")
  181. scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
  182. scd_link, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink_optional")
  183. self.bottom.create_edge(optional_node, scd_node, "Morphism")
  184. self.bottom.create_edge(optional_link, scd_link, "Morphism")
  185. def create_model_ref(self, name: str, model: UUID):
  186. """
  187. Defines a model ref element.
  188. Args:
  189. name: name of the model ref
  190. model: uuid of the external model
  191. Returns:
  192. Nothing.
  193. """
  194. # create element + morphism links
  195. element_node = self.bottom.create_node(str(model)) # create element node
  196. self.bottom.create_edge(self.model, element_node, name) # attach to model
  197. scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef") # retrieve type
  198. self.bottom.create_edge(element_node, scd_node, "Morphism") # create morphism link
  199. return element_node
  200. def create_inheritance(self, child: str, parent: str):
  201. """
  202. Defines an inheritance link element.
  203. Args:
  204. child: child element of the inheritance relationship
  205. parent: parent element of the inheritance relationship
  206. Returns:
  207. Nothing.
  208. """
  209. c, = self.bottom.read_outgoing_elements(self.model, child)
  210. p, = self.bottom.read_outgoing_elements(self.model, parent)
  211. return self._create_inheritance(c, p)
  212. def _create_inheritance(self, child: UUID, parent: UUID):
  213. # create inheritance + morphism links
  214. inh_edge = self.bottom.create_edge(child, parent)
  215. child_name = self.get_class_name(child)
  216. parent_name = self.get_class_name(parent)
  217. self.bottom.create_edge(self.model, inh_edge, f"{child_name}_inh_{parent_name}") # attach to model
  218. scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance") # retrieve type
  219. self.bottom.create_edge(inh_edge, scd_node, "Morphism") # create morphism link
  220. def get_class_name(self, cls: UUID):
  221. for key in self.bottom.read_keys(self.model):
  222. el, = self.bottom.read_outgoing_elements(self.model, key)
  223. if el == cls:
  224. return key
  225. def add_constraint(self, element: str, code: str):
  226. """
  227. Defines a constraint on an element.
  228. Args:
  229. element: element the constraint is attached to
  230. code: constraint code
  231. Returns:
  232. Nothing.
  233. """
  234. element_node, = self.bottom.read_outgoing_elements(self.model, element) # retrieve element
  235. # code attribute
  236. code_node = self.bottom.create_node(code)
  237. self.bottom.create_edge(self.model, code_node, f"{element}.constraint")
  238. code_link = self.bottom.create_edge(element_node, code_node)
  239. self.bottom.create_edge(self.model, code_link, f"{element}.constraint_link")
  240. scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "ActionCode")
  241. scd_link, = self.bottom.read_outgoing_elements(self.scd_model, "Element_constraint")
  242. self.bottom.create_edge(code_node, scd_node, "Morphism")
  243. self.bottom.create_edge(code_link, scd_link, "Morphism")
  244. def list_elements(self):
  245. """
  246. Lists elements in the model.
  247. Returns:
  248. A list of elements in alphabetical order.
  249. """
  250. scd_names = {}
  251. for key in self.bottom.read_keys(self.scd_model):
  252. element, = self.bottom.read_outgoing_elements(self.scd_model, key)
  253. scd_names[element] = key
  254. unsorted = []
  255. for key in self.bottom.read_keys(self.model):
  256. element, = self.bottom.read_outgoing_elements(self.model, key)
  257. element_types = self.bottom.read_outgoing_elements(element, "Morphism")
  258. type_model_elements = self.bottom.read_outgoing_elements(self.scd_model)
  259. element_type_node, = [e for e in element_types if e in type_model_elements]
  260. unsorted.append(f"{key} : {scd_names[element_type_node]}")
  261. return sorted(unsorted)
  262. def get_classes(self):
  263. class_node, = self.bottom.read_outgoing_elements(self.scd_model, "Class")
  264. return self.get_typed_by(class_node)
  265. def get_associations(self):
  266. assoc_node, = self.bottom.read_outgoing_elements(self.scd_model, "Association")
  267. return self.get_typed_by(assoc_node)
  268. def get_inheritances(self):
  269. inh_node, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
  270. return self.get_typed_by(inh_node)
  271. def get_typed_by(self, type_node: UUID):
  272. name_to_instance = {}
  273. for key in self.bottom.read_keys(self.model):
  274. element, = self.bottom.read_outgoing_elements(self.model, key)
  275. element_types = self.bottom.read_outgoing_elements(element, "Morphism")
  276. if type_node in element_types:
  277. name_to_instance[key] = element
  278. # mapping from instance name to UUID
  279. return name_to_instance
  280. def get_attributes(self, class_name: str):
  281. attr_link_node, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink")
  282. class_node, = self.bottom.read_outgoing_elements(self.model, class_name)
  283. name_to_attr = {}
  284. for edge in self.bottom.read_outgoing_edges(class_node):
  285. edge_types = self.bottom.read_outgoing_elements(edge, "Morphism")
  286. if attr_link_node in edge_types:
  287. name_to_attr[key] = edge
  288. return name_to_attr
  289. def delete_element(self, name: str):
  290. """
  291. Deletes an element from the model.
  292. Args:
  293. name: name of the element to delete
  294. Returns:
  295. Nothing.
  296. """
  297. keys = self.bottom.read_keys(self.model)
  298. r = re.compile(r"{}\..*".format(name))
  299. to_delete = list(filter(r.match, keys))
  300. for key in to_delete:
  301. # TODO: find way to solve memory leak, primitive models are not deleted this way
  302. node, = self.bottom.read_outgoing_elements(self.model, label=key)
  303. self.bottom.delete_element(node)
  304. def to_bottom(self):
  305. # already implemented in terms of LTM bottom
  306. pass
  307. def from_bottom(self):
  308. # already implemented in terms of LTM bottom
  309. pass
  310. if __name__ == '__main__':
  311. from state.devstate import DevState as State
  312. s = State()
  313. from bootstrap.scd import bootstrap_scd
  314. scd = bootstrap_scd(s)
  315. # Retrieve refs to primitive type models
  316. # # integer
  317. int_type_id = s.read_dict(s.read_root(), "Integer")
  318. int_type = UUID(s.read_value(int_type_id))
  319. print(f"Integer Model UUID: {int_type}") # 6
  320. # # string
  321. str_type_id = s.read_dict(s.read_root(), "String")
  322. str_type = UUID(s.read_value(str_type_id))
  323. print(f"String Model UUID: {str_type}") # 16
  324. # Create LTM_PN
  325. model_uuid = s.create_node()
  326. print(f"LTM_PN Model UUID: {model_uuid}") # 845
  327. service = SCD(model_uuid, s)
  328. # Create classes
  329. service.create_class("P")
  330. service.create_class("T")
  331. # Create associations
  332. service.create_association("P2T", "P", "T")
  333. service.create_association("T2P", "T", "P")
  334. # Create model refs
  335. service.create_model_ref("Integer", int_type)
  336. service.create_model_ref("String", int_type)
  337. # Create class attributes
  338. service.create_attribute_link("P", "Integer", "t", False)
  339. service.create_attribute_link("P", "String", "n", False)
  340. service.create_attribute_link("T", "String", "n", False)
  341. # Create association attributes
  342. service.create_attribute_link("P2T", "Integer", "w", False)
  343. service.create_attribute_link("T2P", "Integer", "w", False)