od.py 13 KB


  1. from services import od
  2. from api import cd
  3. from services.bottom.V0 import Bottom
  4. from services.primitives.boolean_type import Boolean
  5. from services.primitives.integer_type import Integer
  6. from services.primitives.string_type import String
  7. from services.primitives.actioncode_type import ActionCode
  8. from services.primitives.bytes_type import Bytes
  9. from uuid import UUID
  10. from typing import Optional
  11. from util.timer import Timer
  12. NEXT_LINK_ID = 0
  13. NEXT_OBJ_ID = 0
  14. # Models map names to elements
  15. # This builds the inverse mapping, so we can quickly lookup the name of an element
  16. def build_name_mapping(state, m):
  17. mapping = {}
  18. bottom = Bottom(state)
  19. for name in bottom.read_keys(m):
  20. elements = bottom.read_outgoing_elements(m, name)
  21. if len(elements) > 1:
  22. print(f"Warning: more than one element with name '{name}'")
  23. mapping[elements[0]] = name
  24. return mapping
  25. class NoSuchSlotException(Exception):
  26. pass
  27. # Object Diagram API
  28. # Intended to replace the 'services.od.OD' class eventually
  29. class ODAPI:
  30. def __init__(self, state, m: UUID, mm: UUID):
  31. self.state = state
  32. self.bottom = Bottom(state)
  33. self.m = m
  34. self.mm = mm
  35. self.od = od.OD(mm, m, state)
  36. self.cdapi = cd.CDAPI(state, mm)
  37. self.create_boolean_value = self.od.create_boolean_value
  38. self.create_integer_value = self.od.create_integer_value
  39. self.create_string_value = self.od.create_string_value
  40. self.create_actioncode_value = self.od.create_actioncode_value
  41. self.create_bytes_value = self.od.create_bytes_value
  42. self.__recompute_mappings()
  43. # Called after every change - makes querying faster but modifying slower
  44. def __recompute_mappings(self):
  45. self.m_obj_to_name = build_name_mapping(self.state, self.m)
  46. self.mm_obj_to_name = build_name_mapping(self.state, self.mm)
  47. self.type_to_objs = { type_name : set() for type_name in self.bottom.read_keys(self.mm)}
  48. for m_name in self.bottom.read_keys(self.m):
  49. m_element, = self.bottom.read_outgoing_elements(self.m, m_name)
  50. tm_element = self.get_type(m_element)
  51. if tm_element in self.mm_obj_to_name:
  52. tm_name = self.mm_obj_to_name[tm_element]
  53. # self.obj_to_type[m_name] = tm_name
  54. self.type_to_objs[tm_name].add(m_name)
  55. def get_value(self, obj: UUID):
  56. return od.read_primitive_value(self.bottom, obj, self.mm)[0]
  57. def get_target(self, link: UUID):
  58. return self.bottom.read_edge_target(link)
  59. def get_source(self, link: UUID):
  60. return self.bottom.read_edge_source(link)
  61. def get_slot(self, obj: UUID, attr_name: str):
  62. slot = self.od.get_slot(obj, attr_name)
  63. if slot == None:
  64. raise NoSuchSlotException(f"Object '{self.m_obj_to_name[obj]}' has no slot '{attr_name}'")
  65. return slot
  66. def get_slot_link(self, obj: UUID, attr_name: str):
  67. return self.od.get_slot_link(obj, attr_name)
  68. # Parameter 'include_subtypes': whether to include subtypes of the given association
  69. def get_outgoing(self, obj: UUID, assoc_name: str, include_subtypes=True):
  70. outgoing = self.bottom.read_outgoing_edges(obj)
  71. result = []
  72. for o in outgoing:
  73. try:
  74. type_of_outgoing_link = self.get_type_name(o)
  75. except:
  76. continue # OK, not all edges are typed
  77. if (include_subtypes and self.cdapi.is_subtype(super_type_name=assoc_name, sub_type_name=type_of_outgoing_link)
  78. or not include_subtypes and type_of_outgoing_link == assoc_name):
  79. result.append(o)
  80. return result
  81. # Parameter 'include_subtypes': whether to include subtypes of the given association
  82. def get_incoming(self, obj: UUID, assoc_name: str, include_subtypes=True):
  83. incoming = self.bottom.read_incoming_edges(obj)
  84. result = []
  85. for i in incoming:
  86. try:
  87. type_of_incoming_link = self.get_type_name(i)
  88. except:
  89. continue # OK, not all edges are typed
  90. if (include_subtypes and self.cdapi.is_subtype(super_type_name=assoc_name, sub_type_name=type_of_incoming_link)
  91. or not include_subtypes and type_of_incoming_link == assoc_name):
  92. result.append(i)
  93. return result
  94. # Returns list of tuples (name, obj)
  95. def get_all_instances(self, type_name: str, include_subtypes=True):
  96. if include_subtypes:
  97. all_types = self.cdapi.transitive_sub_types[type_name]
  98. else:
  99. all_types = set([type_name])
  100. obj_names = [obj_name for type_name in all_types for obj_name in self.type_to_objs[type_name]]
  101. return [(obj_name, self.bottom.read_outgoing_elements(self.m, obj_name)[0]) for obj_name in obj_names]
  102. def get_type(self, obj: UUID):
  103. types = self.bottom.read_outgoing_elements(obj, "Morphism")
  104. if len(types) != 1:
  105. raise Exception(f"Expected obj to have 1 type, instead got {len(types)} types.")
  106. return types[0]
  107. def get_name(self, obj: UUID):
  108. if obj in self.m_obj_to_name:
  109. return self.m_obj_to_name[obj]
  110. elif obj in self.mm_obj_to_name:
  111. return self.mm_obj_to_name[obj]
  112. else:
  113. raise Exception(f"Couldn't find name of {obj} - are you sure it exists in the (meta-)model?")
  114. def get(self, name: str):
  115. results = self.bottom.read_outgoing_elements(self.m, name)
  116. if len(results) == 1:
  117. return results[0]
  118. elif len(results) >= 2:
  119. raise Exception("this should never happen")
  120. else:
  121. raise Exception(f"No such element in model: '{name}'")
  122. def get_type_name(self, obj: UUID):
  123. return self.get_name(self.get_type(obj))
  124. def is_instance(self, obj: UUID, type_name: str, include_subtypes=True):
  125. typ = self.cdapi.get_type(type_name)
  126. types = set(typ) if not include_subtypes else self.cdapi.transitive_sub_types[type_name]
  127. for type_of_obj in self.bottom.read_outgoing_elements(obj, "Morphism"):
  128. if self.get_name(type_of_obj) in types:
  129. return True
  130. return False
  131. def delete(self, obj: UUID):
  132. self.bottom.delete_element(obj)
  133. self.__recompute_mappings()
  134. # Does the the object have the given attribute?
  135. def has_slot(self, obj: UUID, attr_name: str):
  136. return self.od.get_slot_link(obj, attr_name) != None
  137. def get_slots(self, obj: UUID) -> list[str]:
  138. return [attr_name for attr_name, _ in self.od.get_slots(obj)]
  139. def get_slot_value(self, obj: UUID, attr_name: str):
  140. slot = self.get_slot(obj, attr_name)
  141. return self.get_value(slot)
  142. # does the given slot contain code?
  143. # this complements `get_slot_value` which will return code as a string
  144. def slot_has_code(self, obj: UUID, attr_name: str):
  145. slot = self.get_slot(obj, attr_name)
  146. return self.get_type_name(slot) == "ActionCode"
  147. # Returns the given default value if the slot does not exist on the object.
  148. # The attribute must exist in the object's class, or an exception will be thrown.
  149. # The slot may not exist however, if the attribute is defined as 'optional' in the class.
  150. def get_slot_value_default(self, obj: UUID, attr_name: str, default: any):
  151. try:
  152. return self.get_slot_value(obj, attr_name)
  153. except NoSuchSlotException:
  154. return default
  155. # create or update slot value
  156. def set_slot_value(self, obj: UUID, attr_name: str, new_value: any, is_code=False):
  157. obj_name = self.get_name(obj)
  158. link_name = f"{obj_name}_{attr_name}"
  159. target_name = f"{obj_name}.{attr_name}"
  160. old_slot_link = self.get_slot_link(obj, attr_name)
  161. if old_slot_link != None:
  162. old_target = self.get_target(old_slot_link)
  163. # if old_target != None:
  164. self.bottom.delete_element(old_target) # this also deletes the slot-link
  165. new_target = self.create_primitive_value(target_name, new_value, is_code)
  166. slot_type = self.cdapi.find_attribute_type(self.get_type_name(obj), attr_name)
  167. new_link = self.od._create_link(link_name, slot_type, obj, new_target)
  168. self.__recompute_mappings()
  169. def create_primitive_value(self, name: str, value: any, is_code=False):
  170. # watch out: in Python, 'bool' is subtype of 'int'
  171. # so we must check for 'bool' first
  172. if isinstance(value, bool):
  173. tgt = self.create_boolean_value(name, value)
  174. elif isinstance(value, int):
  175. tgt = self.create_integer_value(name, value)
  176. elif isinstance(value, str):
  177. if is_code:
  178. tgt = self.create_actioncode_value(name, value)
  179. else:
  180. tgt = self.create_string_value(name, value)
  181. elif isinstance(value, bytes):
  182. tgt = self.create_bytes_value(name, value)
  183. else:
  184. raise Exception("Unimplemented type "+value)
  185. self.__recompute_mappings()
  186. return tgt
  187. def overwrite_primitive_value(self, name: str, value: any, is_code=False):
  188. referred_model = UUID(self.bottom.read_value(self.get(name)))
  189. to_overwrite_type = self.get_type_name(self.get(name))
  190. # watch out: in Python, 'bool' is subtype of 'int'
  191. # so we must check for 'bool' first
  192. if isinstance(value, bool):
  193. if to_overwrite_type != "Boolean":
  194. raise Exception(f"Cannot assign boolean value '{value}' to value of type {to_overwrite_type}.")
  195. Boolean(referred_model, self.state).create(value)
  196. elif isinstance(value, int):
  197. if to_overwrite_type != "Integer":
  198. raise Exception(f"Cannot assign integer value '{value}' to value of type {to_overwrite_type}.")
  199. Integer(referred_model, self.state).create(value)
  200. elif isinstance(value, str):
  201. if is_code:
  202. if to_overwrite_type != "ActionCode":
  203. raise Exception(f"Cannot assign code to value of type {to_overwrite_type}.")
  204. ActionCode(referred_model, self.state).create(value)
  205. else:
  206. if to_overwrite_type != "String":
  207. raise Exception(f"Cannot assign string value '{value}' to value of type {to_overwrite_type}.")
  208. String(referred_model, self.state).create(value)
  209. elif isinstance(value, bytes):
  210. if to_overwrite_type != "Bytes":
  211. raise Exception(f"Cannot assign bytes value '{value}' to value of type {to_overwrite_type}.")
  212. Bytes(referred_model, self.state).create(value)
  213. else:
  214. raise Exception("Unimplemented type "+value)
  215. def create_link(self, link_name: Optional[str], assoc_name: str, src: UUID, tgt: UUID):
  216. global NEXT_LINK_ID
  217. types = self.bottom.read_outgoing_elements(self.mm, assoc_name)
  218. if len(types) == 0:
  219. raise Exception(f"No such association: '{assoc_name}'")
  220. elif len(types) >= 2:
  221. raise Exception(f"More than one association exists with name '{assoc_name}' - this means the MM is invalid.")
  222. typ = types[0]
  223. if link_name == None:
  224. link_name = f"__{assoc_name}{NEXT_LINK_ID}"
  225. NEXT_LINK_ID += 1
  226. link_id = self.od._create_link(link_name, typ, src, tgt)
  227. self.__recompute_mappings()
  228. return link_id
  229. def create_object(self, object_name: Optional[str], class_name: str):
  230. global NEXT_OBJ_ID
  231. if object_name == None:
  232. object_name = f"__{class_name}{NEXT_OBJ_ID}"
  233. NEXT_OBJ_ID += 1
  234. obj = self.od.create_object(object_name, class_name)
  235. self.__recompute_mappings()
  236. return obj
  237. # internal use
  238. # Get API methods as bound functions, to pass as globals to 'eval'
  239. # Readonly version is used for:
  240. # - Conformance checking
  241. # - Pattern matching (LHS/NAC of rule)
  242. def bind_api_readonly(odapi):
  243. funcs = {
  244. 'read_value': odapi.state.read_value,
  245. 'get': odapi.get,
  246. 'get_value': odapi.get_value,
  247. 'get_target': odapi.get_target,
  248. 'get_source': odapi.get_source,
  249. 'get_slot': odapi.get_slot,
  250. 'get_slots': odapi.get_slots,
  251. 'get_slot_value': odapi.get_slot_value,
  252. 'get_slot_value_default': odapi.get_slot_value_default,
  253. 'get_all_instances': odapi.get_all_instances,
  254. 'get_name': odapi.get_name,
  255. 'get_type_name': odapi.get_type_name,
  256. 'get_outgoing': odapi.get_outgoing,
  257. 'get_incoming': odapi.get_incoming,
  258. 'has_slot': odapi.has_slot,
  259. 'is_instance': odapi.is_instance,
  260. }
  261. return funcs
  262. # internal use
  263. # Get API methods as bound functions, to pass as globals to 'eval'
  264. # Read/write version is used for:
  265. # - Graph rewriting (RHS of rule)
  266. def bind_api(odapi):
  267. funcs = {
  268. **bind_api_readonly(odapi),
  269. 'create_object': odapi.create_object,
  270. 'create_link': odapi.create_link,
  271. 'delete': odapi.delete,
  272. 'set_slot_value': odapi.set_slot_value,
  273. }
  274. return funcs