manager.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. from state.base import State
  2. from bootstrap.scd import bootstrap_scd
  3. from services import implemented as services
  4. from framework.conformance import Conformance
  5. from uuid import UUID
  6. class Manager:
  7. def __init__(self, state: State):
  8. self.current_model = None
  9. self.current_context = None
  10. self.state = state
  11. bootstrap_scd(state)
  12. scd_node = self.state.read_dict(self.state.read_root(), "SCD")
  13. for key_node in self.state.read_dict_keys(self.state.read_root()):
  14. model_node = self.state.read_dict_node(self.state.read_root(), key_node)
  15. self.state.create_edge(model_node, scd_node)
  16. def get_models(self):
  17. for key_node in self.state.read_dict_keys(self.state.read_root()):
  18. yield self.state.read_value(key_node)
  19. def instantiate_model(self, type_model_name: str, name: str):
  20. root = self.state.read_root()
  21. type_model_node = self.state.read_dict(root, type_model_name)
  22. if type_model_node is None:
  23. raise RuntimeError(f"No type model with name {type_model_name} found.")
  24. else:
  25. # check if model is a linguistic type model
  26. scd_node = self.state.read_dict(self.state.read_root(), "SCD")
  27. incoming = self.state.read_incoming(scd_node)
  28. incoming = [self.state.read_edge(e)[0] for e in incoming]
  29. if type_model_node not in incoming:
  30. raise RuntimeError(f"Model with name {type_model_name} is not a type model.")
  31. if name in map(self.state.read_value, self.state.read_dict_keys(root)):
  32. raise RuntimeError(f"Model with name {name} already exists.")
  33. new_model_root = self.state.create_node()
  34. new_model_node = self.state.create_nodevalue(str(new_model_root))
  35. self.state.create_dict(root, name, new_model_node)
  36. self.state.create_edge(new_model_node, type_model_node)
  37. self.current_model = (name, new_model_root)
  38. if type_model_name not in services:
  39. raise RuntimeError(f"Services for type {type_model_name} not implemented.")
  40. self.current_context = services[type_model_name](self.current_model[1], self.state)
  41. def select_model(self, name: str):
  42. root = self.state.read_root()
  43. model_node = self.state.read_dict(root, name)
  44. if model_node is None:
  45. raise RuntimeError(f"No model with name {name} found.")
  46. model_root = UUID(self.state.read_value(model_node))
  47. self.current_model = (name, model_root)
  48. def close_model(self):
  49. self.current_model = None
  50. self.current_context = None
  51. def get_types(self):
  52. root = self.state.read_root()
  53. if self.current_model is None:
  54. raise RuntimeError(f"No model currently selected.")
  55. name, model = self.current_model
  56. model_id = self.state.read_dict(root, name)
  57. outgoing = self.state.read_outgoing(model_id)
  58. outgoing = [e for e in outgoing if len(self.state.read_outgoing(e)) == 0]
  59. elements = [self.state.read_edge(e)[1] for e in outgoing]
  60. for e in elements:
  61. incoming = self.state.read_incoming(e)
  62. label_edge, = [e for e in incoming if len(self.state.read_outgoing(e)) == 1]
  63. label_edge, = self.state.read_outgoing(label_edge)
  64. _, label_node = self.state.read_edge(label_edge)
  65. yield self.state.read_value(label_node)
  66. def select_context(self, name: str):
  67. if name not in self.get_types():
  68. raise RuntimeError(f"No type {name} that currently selected model conforms to.")
  69. if name not in services:
  70. raise RuntimeError(f"Services for type {name} not implemented.")
  71. self.current_context = services[name](self.current_model[1], self.state)
  72. self.current_context.from_bottom()
  73. def close_context(self):
  74. self.current_context.to_bottom()
  75. self.current_context = None
  76. def get_services(self):
  77. if self.current_model is None:
  78. raise RuntimeError(f"No model currently selected.")
  79. if self.current_context is None:
  80. raise RuntimeError(f"No context currently selected.")
  81. yield from [
  82. getattr(self.current_context, func)
  83. for func in dir(self.current_context)
  84. if callable(getattr(self.current_context, func))
  85. and not func.startswith("__")
  86. and not func == "from_bottom"
  87. and not func == "to_bottom"
  88. ]
  89. def check_conformance(self, type_model_name: str, model_name: str):
  90. root = self.state.read_root()
  91. type_model_node = self.state.read_dict(root, type_model_name)
  92. if type_model_node is None:
  93. raise RuntimeError(f"No type model with name {type_model_name} found.")
  94. model_node = self.state.read_dict(root, model_name)
  95. if model_node is None:
  96. raise RuntimeError(f"No model with name {model_node} found.")
  97. types = self.state.read_outgoing(model_node)
  98. types = [self.state.read_edge(e)[1] for e in types]
  99. if type_model_node not in types:
  100. conf = Conformance(self.state,
  101. UUID(self.state.read_value(model_node)),
  102. UUID(self.state.read_value(type_model_node))).check_structural(log=True)
  103. if conf:
  104. self.state.create_edge(model_node, type_model_node)
  105. return conf
  106. else:
  107. return Conformance(self.state,
  108. UUID(self.state.read_value(model_node)),
  109. UUID(self.state.read_value(type_model_node))).check_nominal(log=True)
  110. def dump_state(self):
  111. import pickle
  112. with open("state.p", "wb") as file:
  113. pickle.dump(self.state, file)
  114. def load_state(self):
  115. import pickle
  116. with open("state.p", "rb") as file:
  117. self.state = pickle.load(file)
  118. if __name__ == '__main__':
  119. from state.devstate import DevState
  120. s = DevState()
  121. m = Manager(s)
  122. m.select_model("SCD")
  123. m.select_context("SCD")
  124. for f in m.get_services():
  125. print(f)