123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- import copy
- import pickle
- from api.od import ODAPI
- from examples.ftg_pm_pt.helpers.composite_activity import execute_composite_workflow
- def serialize(obj):
- return pickle.dumps(obj)
- def deserialize(obj):
- return pickle.loads(obj)
- def create_activity_links(od: ODAPI, activity, prev_element, ctrl_port, end_trace=None,
- relation_type="pt_IsFollowedBy"):
- od.create_link(None, "pt_RelatesTo", activity, ctrl_port)
- od.create_link(None, relation_type, prev_element, activity)
- if end_trace:
- od.create_link(None, "pt_IsFollowedBy", activity, end_trace)
- def extract_input_data(od: ODAPI, activity):
- input_data = {}
- for has_data_in in od.get_outgoing(activity, "pm_HasDataIn"):
- data_port = od.get_target(has_data_in)
- artefact_state = od.get_source(od.get_incoming(od.get_source(od.get_incoming(data_port, "pm_DataFlowOut")[0]), "pm_Of")[0])
- input_data[od.get_name(data_port)] = deserialize(od.get_slot_value(artefact_state, "data"))
- return input_data
- def execute_activity(od: ODAPI, globs, activity, input_data):
- inp = copy.deepcopy(input_data) # Necessary, otherwise the function changes the values inside the dictionary -> need the original values for process trace
- func = globs[od.get_slot_value(activity, "func")]
- return func(inp) if func.__code__.co_argcount > 0 else func()
- def handle_artefact(od: ODAPI, activity, artefact_type, relation_type, data_port=None, data=None,
- direction="DataFlowIn"):
- artefact = od.create_object(None, "pt_Artefact")
- if 'pt_Consumes' == relation_type:
- od.create_link(None, relation_type, artefact, activity)
- else:
- od.create_link(None, relation_type, activity, artefact)
- if data_port:
- flow_direction = od.get_incoming if relation_type == 'pt_Consumes' else od.get_outgoing
- ass_side = od.get_source if relation_type == 'pt_Consumes' else od.get_target
- pm_artefact = ass_side(flow_direction(data_port, f"pm_{direction}")[0])
- prev_artefact = find_previous_artefact(od, od.get_incoming(pm_artefact, "pt_BelongsTo"))
- if prev_artefact:
- od.create_link(None, "pt_PrevVersion", artefact, prev_artefact)
- od.create_link(None, "pt_BelongsTo", artefact, pm_artefact)
- if data is not None:
- artefact_state = od.get_source(od.get_incoming(pm_artefact, "pm_Of")[0])
- od.set_slot_value(artefact_state, "data", serialize(data))
- od.set_slot_value(artefact, "data", serialize(data))
- def find_previous_artefact(od: ODAPI, linked_artefacts):
- return next((od.get_source(link) for link in linked_artefacts if
- not od.get_incoming(od.get_source(link), "pt_PrevVersion")), None)
- def update_control_states(od: ODAPI, activity, ctrl_out):
- for has_ctrl_in in od.get_outgoing(activity, "pm_HasCtrlIn"):
- od.set_slot_value(od.get_source(od.get_incoming(od.get_target(has_ctrl_in), "pm_Of")[0]), "active", False)
- od.set_slot_value(od.get_source(od.get_incoming(ctrl_out, "pm_Of")[0]), "active", True)
|