manager.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. from state.base import State
  2. from bootstrap.scd import bootstrap_scd
  3. from bootstrap.pn import bootstrap_pn
  4. from services import implemented as services
  5. from framework.conformance import Conformance
  6. from uuid import UUID
  7. class Manager:
  8. def __init__(self, state: State):
  9. self.current_model = None
  10. self.current_context = None
  11. self.state = state
  12. bootstrap_scd(state)
  13. # bootstrap_pn(state, "PN")
  14. scd_node = self.state.read_dict(self.state.read_root(), "SCD")
  15. for key_node in self.state.read_dict_keys(self.state.read_root()):
  16. model_node = self.state.read_dict_node(self.state.read_root(), key_node)
  17. self.state.create_edge(model_node, scd_node)
  18. def get_models(self):
  19. """
  20. Retrieves all existing models
  21. Returns:
  22. Names of exising models
  23. """
  24. for key_node in self.state.read_dict_keys(self.state.read_root()):
  25. yield self.state.read_value(key_node)
  26. def instantiate_model(self, type_model_name: str, name: str):
  27. """
  28. Retrieves all existing models
  29. Args:
  30. type_model_name: name of the type model we want to instantiate
  31. name: name of the instance model to be created
  32. Returns:
  33. Nothing
  34. """
  35. root = self.state.read_root()
  36. type_model_node = self.state.read_dict(root, type_model_name)
  37. if type_model_node == None:
  38. raise RuntimeError(f"No type model with name {type_model_name} found.")
  39. else:
  40. # check if model is a linguistic type model
  41. scd_node = self.state.read_dict(self.state.read_root(), "SCD")
  42. incoming = self.state.read_incoming(scd_node)
  43. incoming = [self.state.read_edge(e)[0] for e in incoming]
  44. if type_model_node not in incoming:
  45. raise RuntimeError(f"Model with name {type_model_name} is not a type model.")
  46. if name in map(self.state.read_value, self.state.read_dict_keys(root)):
  47. raise RuntimeError(f"Model with name {name} already exists.")
  48. new_model_root = self.state.create_node()
  49. new_model_node = self.state.create_nodevalue(str(new_model_root))
  50. self.state.create_dict(root, name, new_model_node)
  51. self.state.create_edge(new_model_node, type_model_node)
  52. self.current_model = (name, new_model_root)
  53. if type_model_name not in services:
  54. raise RuntimeError(f"Services for type {type_model_name} not implemented.")
  55. self.current_context = services[type_model_name](self.current_model[1], self.state)
  56. def select_model(self, name: str):
  57. """
  58. Select a model to interact with
  59. Args:
  60. name: name of the model we want to interact with
  61. Returns:
  62. Nothing
  63. """
  64. root = self.state.read_root()
  65. model_node = self.state.read_dict(root, name)
  66. if model_node == None:
  67. raise RuntimeError(f"No model with name {name} found.")
  68. model_root = UUID(self.state.read_value(model_node))
  69. self.current_model = (name, model_root)
  70. def close_model(self):
  71. """
  72. Clear the currently selected model
  73. Returns:
  74. Nothing
  75. """
  76. self.current_model = None
  77. self.current_context = None
  78. def get_types(self):
  79. """
  80. Retrieve the types of the currently selected model
  81. Returns:
  82. Names of the model's types
  83. """
  84. root = self.state.read_root()
  85. if self.current_model == None:
  86. raise RuntimeError(f"No model currently selected.")
  87. name, model = self.current_model
  88. model_id = self.state.read_dict(root, name)
  89. outgoing = self.state.read_outgoing(model_id)
  90. outgoing = [e for e in outgoing if len(self.state.read_outgoing(e)) == 0]
  91. elements = [self.state.read_edge(e)[1] for e in outgoing]
  92. for e in elements:
  93. incoming = self.state.read_incoming(e)
  94. label_edge, = [e for e in incoming if len(self.state.read_outgoing(e)) == 1]
  95. label_edge, = self.state.read_outgoing(label_edge)
  96. _, label_node = self.state.read_edge(label_edge)
  97. yield self.state.read_value(label_node)
  98. def select_context(self, name: str):
  99. """
  100. Select a type to set as the current context
  101. Args:
  102. name: name of the type/context
  103. Returns:
  104. Nothing
  105. """
  106. if name not in self.get_types():
  107. raise RuntimeError(f"No type {name} that currently selected model conforms to.")
  108. if name not in services:
  109. raise RuntimeError(f"Services for type {name} not implemented.")
  110. self.current_context = services[name](self.current_model[1], self.state)
  111. self.current_context.from_bottom()
  112. def close_context(self):
  113. """
  114. Exit the current (type) context
  115. Returns:
  116. Nothing
  117. """
  118. self.current_context.to_bottom()
  119. self.current_context = None
  120. def get_services(self):
  121. """
  122. Retrieve the services available in the current context
  123. Returns:
  124. Functions exposed by the current context's implementation
  125. """
  126. if self.current_model == None:
  127. raise RuntimeError(f"No model currently selected.")
  128. if self.current_context == None:
  129. raise RuntimeError(f"No context currently selected.")
  130. yield from [
  131. getattr(self.current_context, func)
  132. for func in dir(self.current_context)
  133. if callable(getattr(self.current_context, func))
  134. and not func.startswith("__")
  135. and not func == "from_bottom"
  136. and not func == "to_bottom"
  137. ]
  138. def check_conformance(self, type_model_name: str, model_name: str):
  139. """
  140. If there are existing morphisms between the model and type model
  141. check nominal conformance
  142. Else
  143. find conformance using structural conformance check
  144. Args:
  145. type_model_name: name of the type model to check conformance against
  146. model_name: name of the instance model
  147. Returns:
  148. Boolean indicating whether conformance was found
  149. """
  150. root = self.state.read_root()
  151. type_model_node = self.state.read_dict(root, type_model_name)
  152. if type_model_node == None:
  153. raise RuntimeError(f"No type model with name {type_model_name} found.")
  154. model_node = self.state.read_dict(root, model_name)
  155. if model_node == None:
  156. raise RuntimeError(f"No model with name {model_node} found.")
  157. types = self.state.read_outgoing(model_node)
  158. types = [self.state.read_edge(e)[1] for e in types]
  159. # if type_model_node not in types:
  160. if True:
  161. print("checking structural conformance")
  162. conf = Conformance(self.state,
  163. UUID(self.state.read_value(model_node)),
  164. UUID(self.state.read_value(type_model_node))).check_structural(log=True)
  165. if conf:
  166. self.state.create_edge(model_node, type_model_node)
  167. return conf
  168. else:
  169. print("checking nominal conformance")
  170. return Conformance(self.state,
  171. UUID(self.state.read_value(model_node)),
  172. UUID(self.state.read_value(type_model_node))).check_nominal(log=True)
  173. def dump_state(self):
  174. """
  175. Dumps the current state of the Modelverse to a pickle file
  176. """
  177. import pickle
  178. with open("state.p", "wb") as file:
  179. pickle.dump(self.state, file)
  180. def load_state(self):
  181. """
  182. Loas a state of the Modelverse from a pickle file
  183. """
  184. import pickle
  185. with open("state.p", "rb") as file:
  186. self.state = pickle.load(file)
  187. def to_graphviz(self):
  188. self.state.dump("state.dot")
  189. if __name__ == '__main__':
  190. from state.devstate import DevState
  191. s = DevState()
  192. m = Manager(s)
  193. m.select_model("SCD")
  194. m.select_context("SCD")
  195. for f in m.get_services():
  196. print(f)