manager.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. from state.base import State
  2. from bootstrap.scd import bootstrap_scd
  3. from services import implemented as services
  4. from framework.conformance import Conformance
  5. from uuid import UUID
  6. class Manager:
  7. def __init__(self, state: State):
  8. self.current_model = None
  9. self.current_context = None
  10. self.state = state
  11. bootstrap_scd(state)
  12. scd_node = self.state.read_dict(self.state.read_root(), "SCD")
  13. for key_node in self.state.read_dict_keys(self.state.read_root()):
  14. model_node = self.state.read_dict_node(self.state.read_root(), key_node)
  15. self.state.create_edge(model_node, scd_node)
  16. def get_models(self):
  17. """
  18. Retrieves all existing models
  19. Returns:
  20. Names of exising models
  21. """
  22. for key_node in self.state.read_dict_keys(self.state.read_root()):
  23. yield self.state.read_value(key_node)
  24. def instantiate_model(self, type_model_name: str, name: str):
  25. """
  26. Retrieves all existing models
  27. Args:
  28. type_model_name: name of the type model we want to instantiate
  29. name: name of the instance model to be created
  30. Returns:
  31. Nothing
  32. """
  33. root = self.state.read_root()
  34. type_model_node = self.state.read_dict(root, type_model_name)
  35. if type_model_node is None:
  36. raise RuntimeError(f"No type model with name {type_model_name} found.")
  37. else:
  38. # check if model is a linguistic type model
  39. scd_node = self.state.read_dict(self.state.read_root(), "SCD")
  40. incoming = self.state.read_incoming(scd_node)
  41. incoming = [self.state.read_edge(e)[0] for e in incoming]
  42. if type_model_node not in incoming:
  43. raise RuntimeError(f"Model with name {type_model_name} is not a type model.")
  44. if name in map(self.state.read_value, self.state.read_dict_keys(root)):
  45. raise RuntimeError(f"Model with name {name} already exists.")
  46. new_model_root = self.state.create_node()
  47. new_model_node = self.state.create_nodevalue(str(new_model_root))
  48. self.state.create_dict(root, name, new_model_node)
  49. self.state.create_edge(new_model_node, type_model_node)
  50. self.current_model = (name, new_model_root)
  51. if type_model_name not in services:
  52. raise RuntimeError(f"Services for type {type_model_name} not implemented.")
  53. self.current_context = services[type_model_name](self.current_model[1], self.state)
  54. def select_model(self, name: str):
  55. """
  56. Select a model to interact with
  57. Args:
  58. name: name of the model we want to interact with
  59. Returns:
  60. Nothing
  61. """
  62. root = self.state.read_root()
  63. model_node = self.state.read_dict(root, name)
  64. if model_node is None:
  65. raise RuntimeError(f"No model with name {name} found.")
  66. model_root = UUID(self.state.read_value(model_node))
  67. self.current_model = (name, model_root)
  68. def close_model(self):
  69. """
  70. Clear the currently selected model
  71. Returns:
  72. Nothing
  73. """
  74. self.current_model = None
  75. self.current_context = None
  76. def get_types(self):
  77. """
  78. Retrieve the types of the currently selected model
  79. Returns:
  80. Names of the model's types
  81. """
  82. root = self.state.read_root()
  83. if self.current_model is None:
  84. raise RuntimeError(f"No model currently selected.")
  85. name, model = self.current_model
  86. model_id = self.state.read_dict(root, name)
  87. outgoing = self.state.read_outgoing(model_id)
  88. outgoing = [e for e in outgoing if len(self.state.read_outgoing(e)) == 0]
  89. elements = [self.state.read_edge(e)[1] for e in outgoing]
  90. for e in elements:
  91. incoming = self.state.read_incoming(e)
  92. label_edge, = [e for e in incoming if len(self.state.read_outgoing(e)) == 1]
  93. label_edge, = self.state.read_outgoing(label_edge)
  94. _, label_node = self.state.read_edge(label_edge)
  95. yield self.state.read_value(label_node)
  96. def select_context(self, name: str):
  97. """
  98. Select a type to set as the current context
  99. Args:
  100. name: name of the type/context
  101. Returns:
  102. Nothing
  103. """
  104. if name not in self.get_types():
  105. raise RuntimeError(f"No type {name} that currently selected model conforms to.")
  106. if name not in services:
  107. raise RuntimeError(f"Services for type {name} not implemented.")
  108. self.current_context = services[name](self.current_model[1], self.state)
  109. self.current_context.from_bottom()
  110. def close_context(self):
  111. """
  112. Exit the current (type) context
  113. Returns:
  114. Nothing
  115. """
  116. self.current_context.to_bottom()
  117. self.current_context = None
  118. def get_services(self):
  119. """
  120. Retrieve the services available in the current context
  121. Returns:
  122. Functions exposed by the current context's implementation
  123. """
  124. if self.current_model is None:
  125. raise RuntimeError(f"No model currently selected.")
  126. if self.current_context is None:
  127. raise RuntimeError(f"No context currently selected.")
  128. yield from [
  129. getattr(self.current_context, func)
  130. for func in dir(self.current_context)
  131. if callable(getattr(self.current_context, func))
  132. and not func.startswith("__")
  133. and not func == "from_bottom"
  134. and not func == "to_bottom"
  135. ]
  136. def check_conformance(self, type_model_name: str, model_name: str):
  137. """
  138. If there are existing morphisms between the model and type model
  139. check nominal conformance
  140. Else
  141. find conformance using structural conformance check
  142. Args:
  143. type_model_name: name of the type model to check conformance against
  144. model_name: name of the instance model
  145. Returns:
  146. Boolean indicating whether conformance was found
  147. """
  148. root = self.state.read_root()
  149. type_model_node = self.state.read_dict(root, type_model_name)
  150. if type_model_node is None:
  151. raise RuntimeError(f"No type model with name {type_model_name} found.")
  152. model_node = self.state.read_dict(root, model_name)
  153. if model_node is None:
  154. raise RuntimeError(f"No model with name {model_node} found.")
  155. types = self.state.read_outgoing(model_node)
  156. types = [self.state.read_edge(e)[1] for e in types]
  157. if type_model_node not in types:
  158. conf = Conformance(self.state,
  159. UUID(self.state.read_value(model_node)),
  160. UUID(self.state.read_value(type_model_node))).check_structural(log=True)
  161. if conf:
  162. self.state.create_edge(model_node, type_model_node)
  163. return conf
  164. else:
  165. return Conformance(self.state,
  166. UUID(self.state.read_value(model_node)),
  167. UUID(self.state.read_value(type_model_node))).check_nominal(log=True)
  168. def dump_state(self):
  169. """
  170. Dumps the current state of the Modelverse to a pickle file
  171. """
  172. import pickle
  173. with open("state.p", "wb") as file:
  174. pickle.dump(self.state, file)
  175. def load_state(self):
  176. """
  177. Loas a state of the Modelverse from a pickle file
  178. """
  179. import pickle
  180. with open("state.p", "rb") as file:
  181. self.state = pickle.load(file)
  182. if __name__ == '__main__':
  183. from state.devstate import DevState
  184. s = DevState()
  185. m = Manager(s)
  186. m.select_model("SCD")
  187. m.select_context("SCD")
  188. for f in m.get_services():
  189. print(f)