help_functions.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import copy
  2. import pickle
  3. from api.od import ODAPI
  4. from examples.ftg_pm_pt.helpers.composite_activity import execute_composite_workflow
  5. def serialize(obj):
  6. return pickle.dumps(obj)
  7. def deserialize(obj):
  8. return pickle.loads(obj)
  9. def create_activity_links(od: ODAPI, activity, prev_element, ctrl_port, end_trace=None,
  10. relation_type="pt_IsFollowedBy"):
  11. od.create_link(None, "pt_RelatesTo", activity, ctrl_port)
  12. od.create_link(None, relation_type, prev_element, activity)
  13. if end_trace:
  14. od.create_link(None, "pt_IsFollowedBy", activity, end_trace)
  15. def extract_input_data(od: ODAPI, activity):
  16. input_data = {}
  17. for has_data_in in od.get_outgoing(activity, "pm_HasDataIn"):
  18. data_port = od.get_target(has_data_in)
  19. artefact_state = od.get_source(od.get_incoming(od.get_source(od.get_incoming(data_port, "pm_DataFlowOut")[0]), "pm_Of")[0])
  20. input_data[od.get_name(data_port)] = deserialize(od.get_slot_value(artefact_state, "data"))
  21. return input_data
  22. def execute_activity(od: ODAPI, globs, activity, input_data):
  23. inp = copy.deepcopy(input_data) # Necessary, otherwise the function changes the values inside the dictionary -> need the original values for process trace
  24. func = globs[od.get_slot_value(activity, "func")]
  25. return func(inp) if func.__code__.co_argcount > 0 else func()
  26. def handle_artefact(od: ODAPI, activity, artefact_type, relation_type, data_port=None, data=None,
  27. direction="DataFlowIn"):
  28. artefact = od.create_object(None, "pt_Artefact")
  29. if 'pt_Consumes' == relation_type:
  30. od.create_link(None, relation_type, artefact, activity)
  31. else:
  32. od.create_link(None, relation_type, activity, artefact)
  33. if data_port:
  34. flow_direction = od.get_incoming if relation_type == 'pt_Consumes' else od.get_outgoing
  35. ass_side = od.get_source if relation_type == 'pt_Consumes' else od.get_target
  36. pm_artefact = ass_side(flow_direction(data_port, f"pm_{direction}")[0])
  37. prev_artefact = find_previous_artefact(od, od.get_incoming(pm_artefact, "pt_BelongsTo"))
  38. if prev_artefact:
  39. od.create_link(None, "pt_PrevVersion", artefact, prev_artefact)
  40. od.create_link(None, "pt_BelongsTo", artefact, pm_artefact)
  41. if data is not None:
  42. artefact_state = od.get_source(od.get_incoming(pm_artefact, "pm_Of")[0])
  43. od.set_slot_value(artefact_state, "data", serialize(data))
  44. od.set_slot_value(artefact, "data", serialize(data))
  45. def find_previous_artefact(od: ODAPI, linked_artefacts):
  46. return next((od.get_source(link) for link in linked_artefacts if
  47. not od.get_incoming(od.get_source(link), "pt_PrevVersion")), None)
  48. def update_control_states(od: ODAPI, activity, ctrl_out):
  49. for has_ctrl_in in od.get_outgoing(activity, "pm_HasCtrlIn"):
  50. od.set_slot_value(od.get_source(od.get_incoming(od.get_target(has_ctrl_in), "pm_Of")[0]), "active", False)
  51. od.set_slot_value(od.get_source(od.get_incoming(ctrl_out, "pm_Of")[0]), "active", True)