123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- from uuid import UUID
- from state.base import State
- from services.bottom.V0 import Bottom
- from services.primitives.integer_type import Integer
- from services.primitives.string_type import String
- from services.primitives.boolean_type import Boolean
- from services.primitives.actioncode_type import ActionCode
- from api.cd import CDAPI
- from typing import Optional
- def get_slot_link_name(obj_name: str, attr_name: str):
- return f"{obj_name}_{attr_name}"
- # Object Diagrams service
- class OD:
- def __init__(self, type_model: UUID, model: UUID, state: State):
- """
- Implements services for the object diagrams LTM.
- Implementation is done in terms of services provided by LTM-bottom.
- Args:
- type_model: The SCD-conforming class diagram that contains the types of this object diagram
- model: UUID of the (OD) model to manipulate
- """
- self.scd_model = UUID(state.read_value(state.read_dict(state.read_root(), "SCD")))
- self.type_model = type_model
- self.model = model
- self.bottom = Bottom(state)
- self.cd = CDAPI(self.bottom.state, self.type_model)
- def create_object(self, name: str, class_name: str):
- class_nodes = self.bottom.read_outgoing_elements(self.type_model, class_name)
- if len(class_nodes) == 0:
- raise Exception(f"Cannot create object: No such class '{class_name}'")
- class_node = class_nodes[0]
- abstract_nodes = self.bottom.read_outgoing_elements(self.type_model, f"{class_name}.abstract")
- try:
- return self._create_object(name, class_node)
- except Exception as e:
- e.add_note("Class: " + class_name)
- raise
- def _create_object(self, name: str, class_node: UUID):
- # Look at our `type_model` as if it's an object diagram:
- mm_od = OD(
- get_scd_mm(self.bottom), # the type model of our type model
- self.type_model,
- self.bottom.state)
- slot = mm_od.get_slot(class_node, "abstract")
- if slot != None:
- is_abstract, _ = read_primitive_value(self.bottom, slot, self.scd_model)
- if is_abstract:
- raise Exception("Cannot instantiate abstract class!")
- object_node = self.bottom.create_node()
- self.bottom.create_edge(self.model, object_node, name) # attach to model
- self.bottom.create_edge(object_node, class_node, "Morphism") # typed-by link
- return object_node
- def get_class_of_object(self, object_name: str):
- object_node, = self.bottom.read_outgoing_elements(self.model, object_name) # get the object
- return self._get_class_of_object(object_node)
- def _get_class_of_object(self, object_node: UUID):
- type_el, = self.bottom.read_outgoing_elements(object_node, "Morphism")
- for key in self.bottom.read_keys(self.type_model):
- type_el2, = self.bottom.read_outgoing_elements(self.type_model, key)
- if type_el == type_el2:
- return key
- def create_slot(self, attr_name: str, object_name: str, target_name: str):
- class_name = self.get_class_of_object(object_name)
- attr_link_name = self.get_attr_link_name(class_name, attr_name)
- if attr_link_name == None:
- raise Exception(f"Failed to get link name for attribute '{attr_name}' of object '{object_name}'")
- # An attribute-link is indistinguishable from an ordinary link:
- slot_link_name = get_slot_link_name(object_name, attr_name)
- slot_id = self.create_link(slot_link_name,
- attr_link_name, object_name, target_name)
- return slot_id
- def get_slot(self, object_node: UUID, attr_name: str):
- edge = self.get_slot_link(object_node, attr_name)
- slot_ref = self.bottom.read_edge_target(edge)
- return slot_ref
- def get_slot_link(self, object_node: UUID, attr_name: str):
- # I really don't like how complex and inefficient it is to read an attribute of an object...
- class_name = self._get_class_of_object(object_node)
- attr_link_name = self.get_attr_link_name(class_name, attr_name)
- if attr_link_name == None:
- raise Exception(f"Type '{class_name}' has no attribute '{attr_name}'")
- type_edge, = self.bottom.read_outgoing_elements(self.type_model, attr_link_name)
- for outgoing_edge in self.bottom.read_outgoing_edges(object_node):
- if type_edge in self.bottom.read_outgoing_elements(outgoing_edge, "Morphism"):
- return outgoing_edge
- def get_slots(self, object_node):
- attrlink_node = get_scd_mm_attributelink_node(self.bottom)
- slots = []
- outgoing_links = self.bottom.read_outgoing_edges(object_node)
- for l in outgoing_links:
- for type_of_link in self.bottom.read_outgoing_elements(l, "Morphism"):
- for type_of_type_of_link in self.bottom.read_outgoing_elements(type_of_link, "Morphism"):
- if type_of_type_of_link == attrlink_node:
- # hooray, we have a slot
- attr_name = get_attr_name(self.bottom, type_of_link)
- slots.append((attr_name, l))
- return slots
- def read_slot(self, slot_id):
- tgt = self.bottom.read_edge_target(slot_id)
- return read_primitive_value(self.bottom, tgt, self.type_model)
- def create_integer_value(self, name: str, value: int):
- from services.primitives.integer_type import Integer
- int_node = self.bottom.create_node()
- integer_t = Integer(int_node, self.bottom.state)
- integer_t.create(value)
- # name = 'int'+str(value) # name of the ref to the created integer
- # By convention, the type model must have a ModelRef named "Integer"
- return self.create_model_ref(name, "Integer", int_node)
- def create_boolean_value(self, name: str, value: bool):
- from services.primitives.boolean_type import Boolean
- bool_node = self.bottom.create_node()
- bool_service = Boolean(bool_node, self.bottom.state)
- bool_service.create(value)
- return self.create_model_ref(name, "Boolean", bool_node)
- def create_string_value(self, name: str, value: str):
- from services.primitives.string_type import String
- string_node = self.bottom.create_node()
- string_t = String(string_node, self.bottom.state)
- string_t.create(value)
- return self.create_model_ref(name, "String", string_node)
- def create_actioncode_value(self, name: str, value: str):
- from services.primitives.actioncode_type import ActionCode
- actioncode_node = self.bottom.create_node()
- actioncode_t = ActionCode(actioncode_node, self.bottom.state)
- actioncode_t.create(value)
- return self.create_model_ref(name, "ActionCode", actioncode_node)
- # Identical to the same SCD method:
- def create_model_ref(self, name: str, type_name: str, model: UUID):
- # create element + morphism links
- element_node = self.bottom.create_node(str(model)) # create element node
- self.bottom.create_edge(self.model, element_node, name) # attach to model
- type_node, = self.bottom.read_outgoing_elements(self.type_model, type_name) # retrieve type
- self.bottom.create_edge(element_node, type_node, "Morphism") # create morphism link
- # print('model ref:', name, type_name, element_node, model)
- return element_node
- # The edge connecting an object to the value of a slot must be named `{object_name}_{attr_name}`
- def get_attr_link_name(self, class_name, attr_name):
- return self.cd.get_attr_link_name(class_name, attr_name)
- def create_link(self, link_name: Optional[str], assoc_name: str, src_obj_name: str, tgt_obj_name: str):
- src_obj_nodes = self.bottom.read_outgoing_elements(self.model, src_obj_name)
- if len(src_obj_nodes) == 0:
- raise Exception(f"Cannot create link '{link_name}' ({assoc_name}): source object '{src_obj_name}' not found")
- src_obj_node = src_obj_nodes[0]
- tgt_obj_nodes = self.bottom.read_outgoing_elements(self.model, tgt_obj_name)
- if len(tgt_obj_nodes) == 0:
- raise Exception(f"Cannot create link '{link_name}' ({assoc_name}): target object '{tgt_obj_name}' not found")
- tgt_obj_node = tgt_obj_nodes[0]
- # generate a unique name for the link
- if link_name == None:
- i = 0;
- while True:
- link_name = f"{assoc_name}{i}"
- if len(self.bottom.read_outgoing_elements(self.model, link_name)) == 0:
- break
- i += 1
- type_edges = self.bottom.read_outgoing_elements(self.type_model, assoc_name)
- if len(type_edges) == 0:
- raise Exception(f"No such attribute/association: {assoc_name}")
- type_edge = type_edges[0]
- link_id = self._create_link(link_name, type_edge, src_obj_node, tgt_obj_node)
- return link_id
- # used for attribute-links and association-links
- def _create_link(self, link_name: str, type_edge: UUID, src_obj_node: UUID, tgt_obj_node: UUID):
- # print('create_link', link_name, type_edge, src_obj_node, tgt_obj_node)
- if not isinstance(src_obj_node, UUID):
- raise Exception("Expected source object to be UUID")
- if not isinstance(tgt_obj_node, UUID):
- raise Exception("Expected target object to be UUID")
- # the link itself is unlabeled:
- link_edge = self.bottom.create_edge(src_obj_node, tgt_obj_node)
- if link_edge == None:
- # Why does the above call silently fail??????
- raise Exception("Could not create link")
- # it is only in the context of the model, that the link has a name:
- self.bottom.create_edge(self.model, link_edge, link_name) # add to model
- self.bottom.create_edge(link_edge, type_edge, "Morphism")
- return link_edge
- def get_objects(self, class_node):
- return get_typed_by(self.bottom, self.model, class_node)
- def get_all_objects(self):
- scd_mm = get_scd_mm(self.bottom)
- class_node = get_scd_mm_class_node(self.bottom)
- all_classes = OD(scd_mm, self.type_model, self.bottom.state).get_objects(class_node)
- result = {}
- for class_name, class_node in all_classes.items():
- objects = self.get_objects(class_node)
- result[class_name] = objects
- return result
- def get_all_links(self):
- scd_mm = get_scd_mm(self.bottom)
- assoc_node = get_scd_mm_assoc_node(self.bottom)
- all_classes = OD(scd_mm, self.type_model, self.bottom.state).get_objects(assoc_node)
- result = {}
- for assoc_name, assoc_node in all_classes.items():
- links = self.get_objects(assoc_node)
- m = {}
- for link_name, link_edge in links.items():
- src_node = self.bottom.read_edge_source(link_edge)
- tgt_node = self.bottom.read_edge_target(link_edge)
- src_name = get_object_name(self.bottom, self.model, src_node)
- tgt_name = get_object_name(self.bottom, self.model, tgt_node)
- m[link_name] = (link_edge, src_name, tgt_name)
- result[assoc_name] = m
- return result
- def get_object_name(self, obj: UUID):
- for key in self.bottom.read_keys(self.model):
- for el in self.bottom.read_outgoing_elements(self.model, key):
- if el == obj:
- return key
- def get_types(bottom: Bottom, obj: UUID):
- return bottom.read_outgoing_elements(obj, "Morphism")
- def get_type(bottom: Bottom, obj: UUID):
- types = get_types(bottom, obj)
- if len(types) == 1:
- return types[0]
- elif len(types) > 1:
- raise Exception(f"Expected at most one type. Instead got {len(types)}.")
- def is_typed_by(bottom, el: UUID, typ: UUID):
- for typed_by in get_types(bottom, el):
- if typed_by == typ:
- return True
- return False
- def get_typed_by(bottom, model, type_node: UUID):
- name_to_instance = {}
- for key in bottom.read_keys(model):
- els = bottom.read_outgoing_elements(model, key)
- if len(els) > 1:
- raise Exception(f"Assertion failed: Model contains more than one object named '{key}'!")
- element = els[0]
- element_types = bottom.read_outgoing_elements(element, "Morphism")
- if type_node in element_types:
- name_to_instance[key] = element
- # mapping from instance name to UUID
- return name_to_instance
- def get_scd_mm(bottom):
- scd_metamodel_id = bottom.state.read_dict(bottom.state.read_root(), "SCD")
- scd_metamodel = UUID(bottom.state.read_value(scd_metamodel_id))
- return scd_metamodel
- def get_scd_mm_class_node(bottom: Bottom):
- return get_scd_mm_node(bottom, "Class")
- def get_scd_mm_attributelink_node(bottom: Bottom):
- return get_scd_mm_node(bottom, "AttributeLink")
- def get_scd_mm_attributelink_name_node(bottom: Bottom):
- return get_scd_mm_node(bottom, "AttributeLink_name")
- def get_scd_mm_assoc_node(bottom: Bottom):
- return get_scd_mm_node(bottom, "Association")
- def get_scd_mm_modelref_node(bottom: Bottom):
- return get_scd_mm_node(bottom, "ModelRef")
- def get_scd_mm_actioncode_node(bottom: Bottom):
- return get_scd_mm_node(bottom, "ActionCode")
- def get_scd_mm_node(bottom: Bottom, node_name: str):
- scd_metamodel = get_scd_mm(bottom)
- node, = bottom.read_outgoing_elements(scd_metamodel, node_name)
- return node
- def get_scd_mm_class_uppercard_node(bottom: Bottom):
- return get_scd_mm_node(bottom, "Class_upper_cardinality")
- def get_scd_mm_class_lowercard_node(bottom: Bottom):
- return get_scd_mm_node(bottom, "Class_lower_cardinality")
- def get_scd_mm_assoc_src_uppercard_node(bottom: Bottom):
- return get_scd_mm_node(bottom, "Association_source_upper_cardinality")
- def get_scd_mm_assoc_src_lowercard_node(bottom: Bottom):
- return get_scd_mm_node(bottom, "Association_source_lower_cardinality")
- def get_scd_mm_assoc_tgt_uppercard_node(bottom: Bottom):
- return get_scd_mm_node(bottom, "Association_target_upper_cardinality")
- def get_scd_mm_assoc_tgt_lowercard_node(bottom: Bottom):
- return get_scd_mm_node(bottom, "Association_target_lower_cardinality")
- def get_object_name(bottom: Bottom, model: UUID, object_node: UUID):
- for key in bottom.read_keys(model):
- for el in bottom.read_outgoing_elements(model, key):
- if el == object_node:
- return key
- def get_type2(bottom: Bottom, mm: UUID, object_node: UUID):
- type_node, = bottom.read_outgoing_elements(object_node, "Morphism")
- return type_node, get_object_name(bottom, mm, type_node)
- def find_outgoing_typed_by(bottom, src: UUID, type_node: UUID):
- edges = []
- for outgoing_edge in bottom.read_outgoing_edges(src):
- for typedBy in bottom.read_outgoing_elements(outgoing_edge, "Morphism"):
- if typedBy == type_node:
- edges.append(outgoing_edge)
- break
- return edges
- def find_incoming_typed_by(bottom, tgt: UUID, type_node: UUID):
- edges = []
- for incoming_edge in bottom.read_incoming_edges(tgt):
- for typedBy in bottom.read_outgoing_elements(incoming_edge, "Morphism"):
- if typedBy == type_node:
- edges.append(incoming_edge)
- break
- return edges
- def navigate_modelref(bottom, node: UUID):
- uuid = bottom.read_value(node)
- return UUID(uuid)
- def find_cardinality(bottom, class_node: UUID, type_node: UUID):
- upper_card_edges = find_outgoing_typed_by(bottom, class_node, type_node)
- if len(upper_card_edges) == 1:
- ref = bottom.read_edge_target(upper_card_edges[0])
- integer, = bottom.read_outgoing_elements(
- navigate_modelref(bottom, ref),
- "integer")
- # finally, the value we're looking for:
- return bottom.read_value(integer)
- def get_attributes(bottom, class_node: UUID):
- attr_link_node = get_scd_mm_attributelink_node(bottom)
- attr_edges = find_outgoing_typed_by(bottom, class_node, attr_link_node)
- result = []
- for attr_edge in attr_edges:
- attr_name = get_attr_name(bottom, attr_edge)
- result.append((attr_name, attr_edge))
- return result
- def get_attr_name(bottom, attr_edge: UUID):
- attr_link_name_node = get_scd_mm_attributelink_name_node(bottom)
- name_edge, = find_outgoing_typed_by(bottom, attr_edge, attr_link_name_node)
- if name_edge == None:
- raise Exception("Expected attribute to have a name...")
- ref_name = bottom.read_edge_target(name_edge)
- string, = bottom.read_outgoing_elements(
- navigate_modelref(bottom, ref_name),
- "string")
- return bottom.read_value(string)
- # We need the meta-model (`mm`) to find out how to read the `modelref`
- def read_primitive_value(bottom, modelref: UUID, mm: UUID):
- typ = get_type(bottom, modelref)
- if not is_typed_by(bottom, typ, get_scd_mm_modelref_node(bottom)):
- raise Exception("Assertion failed: argument must be typed by ModelRef", typ)
- referred_model = UUID(bottom.read_value(modelref))
- typ_name = get_object_name(bottom, model=mm, object_node=typ)
- if typ_name == "Integer":
- return Integer(referred_model, bottom.state).read(), typ_name
- elif typ_name == "String":
- return String(referred_model, bottom.state).read(), typ_name
- elif typ_name == "Boolean":
- return Boolean(referred_model, bottom.state).read(), typ_name
- elif typ_name == "ActionCode":
- return ActionCode(referred_model, bottom.state).read(), typ_name
- else:
- raise Exception("Unimplemented type:", typ_name)
|