od.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. from uuid import UUID
  2. from state.base import State
  3. from services.bottom.V0 import Bottom
  4. from services.primitives.integer_type import Integer
  5. from services.primitives.string_type import String
  6. from services.primitives.boolean_type import Boolean
  7. from services.primitives.actioncode_type import ActionCode
  8. from framework.conformance import Conformance
  9. from typing import Optional
  10. def get_slot_link_name(obj_name: str, attr_name: str):
  11. return f"{obj_name}_{attr_name}"
  12. # Object Diagrams service
  13. class OD:
  14. def __init__(self, type_model: UUID, model: UUID, state: State):
  15. """
  16. Implements services for the object diagrams LTM.
  17. Implementation is done in terms of services provided by LTM-bottom.
  18. Args:
  19. type_model: The SCD-conforming class diagram that contains the types of this object diagram
  20. model: UUID of the (OD) model to manipulate
  21. """
  22. self.type_model = type_model
  23. self.model = model
  24. self.bottom = Bottom(state)
  25. def create_object(self, name: str, class_name: str):
  26. class_node, = self.bottom.read_outgoing_elements(self.type_model, class_name)
  27. abstract_nodes = self.bottom.read_outgoing_elements(self.type_model, f"{class_name}.abstract")
  28. return self._create_object(name, class_node)
  29. def _create_object(self, name: str, class_node: UUID):
  30. # Look at our `type_model` as if it's an object diagram:
  31. mm_od = OD(
  32. get_scd_mm(self.bottom), # the type model of our type model
  33. self.type_model,
  34. self.bottom.state)
  35. slot = mm_od.get_slot(class_node, "abstract")
  36. if slot != None:
  37. is_abstract, _ = read_primitive_value(self.bottom, slot, self.type_model)
  38. if is_abstract:
  39. raise Exception("Cannot instantiate abstract class!")
  40. object_node = self.bottom.create_node()
  41. self.bottom.create_edge(self.model, object_node, name) # attach to model
  42. self.bottom.create_edge(object_node, class_node, "Morphism") # typed-by link
  43. return object_node
  44. def get_class_of_object(self, object_name: str):
  45. object_node, = self.bottom.read_outgoing_elements(self.model, object_name) # get the object
  46. return self._get_class_of_object(object_node)
  47. def _get_class_of_object(self, object_node: UUID):
  48. type_el, = self.bottom.read_outgoing_elements(object_node, "Morphism")
  49. for key in self.bottom.read_keys(self.type_model):
  50. type_el2, = self.bottom.read_outgoing_elements(self.type_model, key)
  51. if type_el == type_el2:
  52. return key
  53. def create_slot(self, attr_name: str, object_name: str, target_name: str):
  54. class_name = self.get_class_of_object(object_name)
  55. attr_link_name = self.get_attr_link_name(class_name, attr_name)
  56. # An attribute-link is indistinguishable from an ordinary link:
  57. slot_id = self.create_link(
  58. get_slot_link_name(object_name, attr_name),
  59. attr_link_name, object_name, target_name)
  60. return slot_id
  61. def get_slot(self, object_node: UUID, attr_name: str):
  62. # I really don't like how complex and inefficient it is to read an attribute of an object...
  63. class_name = self._get_class_of_object(object_node)
  64. attr_link_name = self.get_attr_link_name(class_name, attr_name)
  65. type_edge, = self.bottom.read_outgoing_elements(self.type_model, attr_link_name)
  66. for outgoing_edge in self.bottom.read_outgoing_edges(object_node):
  67. if type_edge in self.bottom.read_outgoing_elements(outgoing_edge, "Morphism"):
  68. slot_ref = self.bottom.read_edge_target(outgoing_edge)
  69. return slot_ref
  70. def get_slots(self, object_node):
  71. attrlink_node = get_scd_mm_attributelink_node(self.bottom)
  72. slots = []
  73. outgoing_links = self.bottom.read_outgoing_edges(object_node)
  74. for l in outgoing_links:
  75. for type_of_link in self.bottom.read_outgoing_elements(l, "Morphism"):
  76. for type_of_type_of_link in self.bottom.read_outgoing_elements(type_of_link, "Morphism"):
  77. if type_of_type_of_link == attrlink_node:
  78. # hooray, we have a slot
  79. attr_name = get_attr_name(self.bottom, type_of_link)
  80. slots.append((attr_name, l))
  81. return slots
  82. def read_slot(self, slot_id):
  83. tgt = self.bottom.read_edge_target(slot_id)
  84. return read_primitive_value(self.bottom, tgt, self.type_model)
  85. def create_integer_value(self, name: str, value: int):
  86. from services.primitives.integer_type import Integer
  87. int_node = self.bottom.create_node()
  88. integer_t = Integer(int_node, self.bottom.state)
  89. integer_t.create(value)
  90. # name = 'int'+str(value) # name of the ref to the created integer
  91. # By convention, the type model must have a ModelRef named "Integer"
  92. self.create_model_ref(name, "Integer", int_node)
  93. return name
  94. def create_boolean_value(self, name: str, value: bool):
  95. from services.primitives.boolean_type import Boolean
  96. bool_node = self.bottom.create_node()
  97. bool_service = Boolean(bool_node, self.bottom.state)
  98. bool_service.create(value)
  99. # name = 'int'+str(value) # name of the ref to the created integer
  100. # By convention, the type model must have a ModelRef named "Integer"
  101. self.create_model_ref(name, "Boolean", bool_node)
  102. return name
  103. def create_string_value(self, name: str, value: str):
  104. from services.primitives.string_type import String
  105. string_node = self.bottom.create_node()
  106. string_t = String(string_node, self.bottom.state)
  107. string_t.create(value)
  108. # name = 'str-'+value # name of the ref to the created integer
  109. # By convention, the type model must have a ModelRef named "Integer"
  110. self.create_model_ref(name, "String", string_node)
  111. return name
  112. def create_actioncode_value(self, name: str, value: str):
  113. from services.primitives.actioncode_type import ActionCode
  114. actioncode_node = self.bottom.create_node()
  115. actioncode_t = ActionCode(actioncode_node, self.bottom.state)
  116. actioncode_t.create(value)
  117. # name = 'str-'+value # name of the ref to the created integer
  118. # By convention, the type model must have a ModelRef named "Integer"
  119. self.create_model_ref(name, "ActionCode", actioncode_node)
  120. return name
  121. # Identical to the same SCD method:
  122. def create_model_ref(self, name: str, type_name: str, model: UUID):
  123. # create element + morphism links
  124. element_node = self.bottom.create_node(str(model)) # create element node
  125. self.bottom.create_edge(self.model, element_node, name) # attach to model
  126. type_node, = self.bottom.read_outgoing_elements(self.type_model, type_name) # retrieve type
  127. self.bottom.create_edge(element_node, type_node, "Morphism") # create morphism link
  128. # print('model ref:', name, type_name, element_node, model)
  129. return element_node
  130. # The edge connecting an object to the value of a slot must be named `{object_name}_{attr_name}`
  131. def get_attr_link_name(self, class_name, attr_name):
  132. assoc_name = f"{class_name}_{attr_name}"
  133. type_edges = self.bottom.read_outgoing_elements(self.type_model, assoc_name)
  134. if len(type_edges) == 1:
  135. return assoc_name
  136. else:
  137. # look for attribute in the super-types
  138. conf = Conformance(self.bottom.state, self.type_model, get_scd_mm(self.bottom))
  139. conf.precompute_sub_types() # only need to know about subtypes
  140. super_types = (s for s in conf.sub_types if class_name in conf.sub_types[s])
  141. for s in super_types:
  142. assoc_name = f"{s}_{attr_name}"
  143. if len(self.bottom.read_outgoing_elements(self.type_model, assoc_name)) == 1:
  144. return assoc_name
  145. def create_link(self, link_name: Optional[str], assoc_name: str, src_obj_name: str, tgt_obj_name: str):
  146. src_obj_node, = self.bottom.read_outgoing_elements(self.model, src_obj_name)
  147. tgt_obj_node, = self.bottom.read_outgoing_elements(self.model, tgt_obj_name)
  148. # generate a unique name for the link
  149. if link_name == None:
  150. i = 0;
  151. while True:
  152. link_name = f"{assoc_name}{i}"
  153. if len(self.bottom.read_outgoing_elements(self.model, link_name)) == 0:
  154. break
  155. i += 1
  156. type_edges = self.bottom.read_outgoing_elements(self.type_model, assoc_name)
  157. if len(type_edges) == 0:
  158. raise Exception(f"No such attribute/association: {assoc_name}")
  159. type_edge = type_edges[0]
  160. link_id = self._create_link(link_name, type_edge, src_obj_node, tgt_obj_node)
  161. return link_id
  162. def _create_link(self, link_name: str, type_edge: UUID, src_obj_node: UUID, tgt_obj_node: UUID):
  163. # the link itself is unlabeled:
  164. link_edge = self.bottom.create_edge(src_obj_node, tgt_obj_node)
  165. # it is only in the context of the model, that the link has a name:
  166. self.bottom.create_edge(self.model, link_edge, link_name) # add to model
  167. self.bottom.create_edge(link_edge, type_edge, "Morphism")
  168. return link_edge
  169. def get_objects(self, class_node):
  170. return get_typed_by(self.bottom, self.model, class_node)
  171. def get_all_objects(self):
  172. scd_mm = get_scd_mm(self.bottom)
  173. class_node = get_scd_mm_class_node(self.bottom)
  174. all_classes = OD(scd_mm, self.type_model, self.bottom.state).get_objects(class_node)
  175. result = {}
  176. for class_name, class_node in all_classes.items():
  177. objects = self.get_objects(class_node)
  178. result[class_name] = objects
  179. return result
  180. def get_all_links(self):
  181. scd_mm = get_scd_mm(self.bottom)
  182. assoc_node = get_scd_mm_assoc_node(self.bottom)
  183. all_classes = OD(scd_mm, self.type_model, self.bottom.state).get_objects(assoc_node)
  184. result = {}
  185. for assoc_name, assoc_node in all_classes.items():
  186. links = self.get_objects(assoc_node)
  187. m = {}
  188. for link_name, link_edge in links.items():
  189. src_node = self.bottom.read_edge_source(link_edge)
  190. tgt_node = self.bottom.read_edge_target(link_edge)
  191. src_name = get_object_name(self.bottom, self.model, src_node)
  192. tgt_name = get_object_name(self.bottom, self.model, tgt_node)
  193. m[link_name] = (link_edge, src_name, tgt_name)
  194. result[assoc_name] = m
  195. return result
  196. def get_object_name(self, obj: UUID):
  197. for key in self.bottom.read_keys(self.model):
  198. for el in self.bottom.read_outgoing_elements(self.model, key):
  199. if el == obj:
  200. return key
  201. def get_types(bottom: Bottom, obj: UUID):
  202. return bottom.read_outgoing_elements(obj, "Morphism")
  203. def get_type(bottom: Bottom, obj: UUID):
  204. types = get_types(bottom, obj)
  205. if len(types) == 1:
  206. return types[0]
  207. elif len(types) > 1:
  208. raise Exception(f"Expected at most one type. Instead got {len(types)}.")
  209. def is_typed_by(bottom, el: UUID, typ: UUID):
  210. for typed_by in get_types(bottom, el):
  211. if typed_by == typ:
  212. return True
  213. return False
  214. def get_typed_by(bottom, model, type_node: UUID):
  215. name_to_instance = {}
  216. for key in bottom.read_keys(model):
  217. element, = bottom.read_outgoing_elements(model, key)
  218. element_types = bottom.read_outgoing_elements(element, "Morphism")
  219. if type_node in element_types:
  220. name_to_instance[key] = element
  221. # mapping from instance name to UUID
  222. return name_to_instance
  223. def get_scd_mm(bottom):
  224. scd_metamodel_id = bottom.state.read_dict(bottom.state.read_root(), "SCD")
  225. scd_metamodel = UUID(bottom.state.read_value(scd_metamodel_id))
  226. return scd_metamodel
  227. def get_scd_mm_class_node(bottom: Bottom):
  228. return get_scd_mm_node(bottom, "Class")
  229. def get_scd_mm_attributelink_node(bottom: Bottom):
  230. return get_scd_mm_node(bottom, "AttributeLink")
  231. def get_scd_mm_attributelink_name_node(bottom: Bottom):
  232. return get_scd_mm_node(bottom, "AttributeLink_name")
  233. def get_scd_mm_assoc_node(bottom: Bottom):
  234. return get_scd_mm_node(bottom, "Association")
  235. def get_scd_mm_modelref_node(bottom: Bottom):
  236. return get_scd_mm_node(bottom, "ModelRef")
  237. def get_scd_mm_actioncode_node(bottom: Bottom):
  238. return get_scd_mm_node(bottom, "ActionCode")
  239. def get_scd_mm_node(bottom: Bottom, node_name: str):
  240. scd_metamodel = get_scd_mm(bottom)
  241. node, = bottom.read_outgoing_elements(scd_metamodel, node_name)
  242. return node
  243. def get_scd_mm_class_uppercard_node(bottom: Bottom):
  244. return get_scd_mm_node(bottom, "Class_upper_cardinality")
  245. def get_scd_mm_class_lowercard_node(bottom: Bottom):
  246. return get_scd_mm_node(bottom, "Class_lower_cardinality")
  247. def get_scd_mm_assoc_src_uppercard_node(bottom: Bottom):
  248. return get_scd_mm_node(bottom, "Association_source_upper_cardinality")
  249. def get_scd_mm_assoc_src_lowercard_node(bottom: Bottom):
  250. return get_scd_mm_node(bottom, "Association_source_lower_cardinality")
  251. def get_scd_mm_assoc_tgt_uppercard_node(bottom: Bottom):
  252. return get_scd_mm_node(bottom, "Association_target_upper_cardinality")
  253. def get_scd_mm_assoc_tgt_lowercard_node(bottom: Bottom):
  254. return get_scd_mm_node(bottom, "Association_target_lower_cardinality")
  255. def get_object_name(bottom: Bottom, model: UUID, object_node: UUID):
  256. for key in bottom.read_keys(model):
  257. for el in bottom.read_outgoing_elements(model, key):
  258. if el == object_node:
  259. return key
  260. def get_type2(bottom: Bottom, mm: UUID, object_node: UUID):
  261. type_node, = bottom.read_outgoing_elements(object_node, "Morphism")
  262. return type_node, get_object_name(bottom, mm, type_node)
  263. def find_outgoing_typed_by(bottom, src: UUID, type_node: UUID):
  264. edges = []
  265. for outgoing_edge in bottom.read_outgoing_edges(src):
  266. for typedBy in bottom.read_outgoing_elements(outgoing_edge, "Morphism"):
  267. if typedBy == type_node:
  268. edges.append(outgoing_edge)
  269. break
  270. return edges
  271. def find_incoming_typed_by(bottom, tgt: UUID, type_node: UUID):
  272. edges = []
  273. for incoming_edge in bottom.read_incoming_edges(tgt):
  274. for typedBy in bottom.read_outgoing_elements(incoming_edge, "Morphism"):
  275. if typedBy == type_node:
  276. edges.append(incoming_edge)
  277. break
  278. return edges
  279. def navigate_modelref(bottom, node: UUID):
  280. uuid = bottom.read_value(node)
  281. return UUID(uuid)
  282. def find_cardinality(bottom, class_node: UUID, type_node: UUID):
  283. upper_card_edges = find_outgoing_typed_by(bottom, class_node, type_node)
  284. if len(upper_card_edges) == 1:
  285. ref = bottom.read_edge_target(upper_card_edges[0])
  286. integer, = bottom.read_outgoing_elements(
  287. navigate_modelref(bottom, ref),
  288. "integer")
  289. # finally, the value we're looking for:
  290. return bottom.read_value(integer)
  291. def get_attributes(bottom, class_node: UUID):
  292. attr_link_node = get_scd_mm_attributelink_node(bottom)
  293. attr_edges = find_outgoing_typed_by(bottom, class_node, attr_link_node)
  294. result = []
  295. for attr_edge in attr_edges:
  296. attr_name = get_attr_name(bottom, attr_edge)
  297. result.append((attr_name, attr_edge))
  298. return result
  299. def get_attr_name(bottom, attr_edge: UUID):
  300. attr_link_name_node = get_scd_mm_attributelink_name_node(bottom)
  301. name_edge, = find_outgoing_typed_by(bottom, attr_edge, attr_link_name_node)
  302. if name_edge == None:
  303. raise Exception("Expected attribute to have a name...")
  304. ref_name = bottom.read_edge_target(name_edge)
  305. string, = bottom.read_outgoing_elements(
  306. navigate_modelref(bottom, ref_name),
  307. "string")
  308. return bottom.read_value(string)
  309. # We need the meta-model (`mm`) to find out how to read the `modelref`
  310. def read_primitive_value(bottom, modelref: UUID, mm: UUID):
  311. typ = get_type(bottom, modelref)
  312. if not is_typed_by(bottom, typ, get_scd_mm_modelref_node(bottom)):
  313. raise Exception("Assertion failed: argument must be typed by ModelRef", typ)
  314. referred_model = UUID(bottom.read_value(modelref))
  315. typ_name = get_object_name(bottom, mm, typ)
  316. if typ_name == "Integer":
  317. return Integer(referred_model, bottom.state).read(), typ_name
  318. elif typ_name == "String":
  319. return String(referred_model, bottom.state).read(), typ_name
  320. elif typ_name == "Boolean":
  321. return Boolean(referred_model, bottom.state).read(), typ_name
  322. elif typ_name == "ActionCode":
  323. return ActionCode(referred_model, bottom.state).read(), typ_name
  324. else:
  325. raise Exception("Unimplemented type:", typ_name)