123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- from state.base import State
- from bootstrap.scd import bootstrap_scd
- from bootstrap.pn import bootstrap_pn
- from services import implemented as services
- from framework.conformance import Conformance
- from uuid import UUID
- class Manager:
- def __init__(self, state: State):
- self.current_model = None
- self.current_context = None
- self.state = state
- bootstrap_scd(state)
- # bootstrap_pn(state, "PN")
- scd_node = self.state.read_dict(self.state.read_root(), "SCD")
- for key_node in self.state.read_dict_keys(self.state.read_root()):
- model_node = self.state.read_dict_node(self.state.read_root(), key_node)
- self.state.create_edge(model_node, scd_node)
- def get_models(self):
- """
- Retrieves all existing models
-
- Returns:
- Names of exising models
- """
- for key_node in self.state.read_dict_keys(self.state.read_root()):
- yield self.state.read_value(key_node)
- def instantiate_model(self, type_model_name: str, name: str):
- """
- Retrieves all existing models
-
- Args:
- type_model_name: name of the type model we want to instantiate
- name: name of the instance model to be created
- Returns:
- Nothing
- """
- root = self.state.read_root()
- type_model_node = self.state.read_dict(root, type_model_name)
- if type_model_node == None:
- raise RuntimeError(f"No type model with name {type_model_name} found.")
- else:
- # check if model is a linguistic type model
- scd_node = self.state.read_dict(self.state.read_root(), "SCD")
- incoming = self.state.read_incoming(scd_node)
- incoming = [self.state.read_edge(e)[0] for e in incoming]
- if type_model_node not in incoming:
- raise RuntimeError(f"Model with name {type_model_name} is not a type model.")
- if name in map(self.state.read_value, self.state.read_dict_keys(root)):
- raise RuntimeError(f"Model with name {name} already exists.")
- new_model_root = self.state.create_node()
- new_model_node = self.state.create_nodevalue(str(new_model_root))
- self.state.create_dict(root, name, new_model_node)
- self.state.create_edge(new_model_node, type_model_node)
- self.current_model = (name, new_model_root)
- if type_model_name not in services:
- raise RuntimeError(f"Services for type {type_model_name} not implemented.")
- self.current_context = services[type_model_name](self.current_model[1], self.state)
- def select_model(self, name: str):
- """
- Select a model to interact with
- Args:
- name: name of the model we want to interact with
- Returns:
- Nothing
- """
- root = self.state.read_root()
- model_node = self.state.read_dict(root, name)
- if model_node == None:
- raise RuntimeError(f"No model with name {name} found.")
- model_root = UUID(self.state.read_value(model_node))
- self.current_model = (name, model_root)
- def close_model(self):
- """
- Clear the currently selected model
- Returns:
- Nothing
- """
- self.current_model = None
- self.current_context = None
- def get_types(self):
- """
- Retrieve the types of the currently selected model
- Returns:
- Names of the model's types
- """
- root = self.state.read_root()
- if self.current_model == None:
- raise RuntimeError(f"No model currently selected.")
- name, model = self.current_model
- model_id = self.state.read_dict(root, name)
- outgoing = self.state.read_outgoing(model_id)
- outgoing = [e for e in outgoing if len(self.state.read_outgoing(e)) == 0]
- elements = [self.state.read_edge(e)[1] for e in outgoing]
- for e in elements:
- incoming = self.state.read_incoming(e)
- label_edge, = [e for e in incoming if len(self.state.read_outgoing(e)) == 1]
- label_edge, = self.state.read_outgoing(label_edge)
- _, label_node = self.state.read_edge(label_edge)
- yield self.state.read_value(label_node)
- def select_context(self, name: str):
- """
- Select a type to set as the current context
- Args:
- name: name of the type/context
- Returns:
- Nothing
- """
- if name not in self.get_types():
- raise RuntimeError(f"No type {name} that currently selected model conforms to.")
- if name not in services:
- raise RuntimeError(f"Services for type {name} not implemented.")
- self.current_context = services[name](self.current_model[1], self.state)
- self.current_context.from_bottom()
- def close_context(self):
- """
- Exit the current (type) context
- Returns:
- Nothing
- """
- self.current_context.to_bottom()
- self.current_context = None
- def get_services(self):
- """
- Retrieve the services available in the current context
- Returns:
- Functions exposed by the current context's implementation
- """
- if self.current_model == None:
- raise RuntimeError(f"No model currently selected.")
- if self.current_context == None:
- raise RuntimeError(f"No context currently selected.")
- yield from [
- getattr(self.current_context, func)
- for func in dir(self.current_context)
- if callable(getattr(self.current_context, func))
- and not func.startswith("__")
- and not func == "from_bottom"
- and not func == "to_bottom"
- ]
- def check_conformance(self, type_model_name: str, model_name: str):
- """
- If there are existing morphisms between the model and type model
- check nominal conformance
- Else
- find conformance using structural conformance check
- Args:
- type_model_name: name of the type model to check conformance against
- model_name: name of the instance model
- Returns:
- Boolean indicating whether conformance was found
- """
- root = self.state.read_root()
- type_model_node = self.state.read_dict(root, type_model_name)
- if type_model_node == None:
- raise RuntimeError(f"No type model with name {type_model_name} found.")
- model_node = self.state.read_dict(root, model_name)
- if model_node == None:
- raise RuntimeError(f"No model with name {model_node} found.")
- types = self.state.read_outgoing(model_node)
- types = [self.state.read_edge(e)[1] for e in types]
- # if type_model_node not in types:
- if True:
- print("checking structural conformance")
- conf = Conformance(self.state,
- UUID(self.state.read_value(model_node)),
- UUID(self.state.read_value(type_model_node))).check_structural(log=True)
- if conf:
- self.state.create_edge(model_node, type_model_node)
- return conf
- else:
- print("checking nominal conformance")
- return Conformance(self.state,
- UUID(self.state.read_value(model_node)),
- UUID(self.state.read_value(type_model_node))).check_nominal(log=True)
- def dump_state(self):
- """
- Dumps the current state of the Modelverse to a pickle file
- """
- import pickle
- with open("state.p", "wb") as file:
- pickle.dump(self.state, file)
- def load_state(self):
- """
- Loas a state of the Modelverse from a pickle file
- """
- import pickle
- with open("state.p", "rb") as file:
- self.state = pickle.load(file)
- def to_graphviz(self):
- self.state.dump("state.dot")
- if __name__ == '__main__':
- from state.devstate import DevState
- s = DevState()
- m = Manager(s)
- m.select_model("SCD")
- m.select_context("SCD")
- for f in m.get_services():
- print(f)
|