123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- print("SCCD PythonPDEVS CLI interface")
- import sys
- import time
- from pprint import pprint
- sys.path.append("../src/")
- from sccd.runtime.statecharts_core import Event
- import simulator
- from ps_model import Root
- controller = simulator.Controller(Root())
- reply_port = controller.addOutputListener('reply')
- def set_defaults(inp, defaultlist):
- for i, v in enumerate(defaultlist):
- if len(inp) == i + 1:
- inp.append(v)
- def get_bool(val):
- if val.lower() in ["true", "yes", "1"]:
- return True
- else:
- return False
- def poll_port(port):
- while 1:
- try:
- event = port.fetch(-1)
- print("%s" % event.getName())
- pprint(event.getParameters())
- except:
- pass
- def generate_input():
- print("READY for input")
- print("Type 'help' for information")
- while 1:
- inp = raw_input().split(" ")
- action = inp[0]
- if inp[0] == "simulate":
- set_defaults(inp, ['inf'])
- params = [{"termination_time": float(inp[1])}]
- elif inp[0] == "big_step":
- set_defaults(inp, ['inf'])
- params = [{"termination_time": float(inp[1])}]
- elif inp[0] == "realtime":
- set_defaults(inp, ['inf', 1.0])
- params = [{"termination_time": float(inp[1]), "realtime_scale": float(inp[2])}]
- elif inp[0] == "small_step":
- set_defaults(inp, ['inf'])
- params = [{"termination_time": float(inp[1])}]
- elif inp[0] == "god_event":
- if len(inp) != 4:
- print("God Events require 3 parameters!")
- continue
- params = [{"model": inp[1], "attribute": inp[2], "value": inp[3]}]
- elif inp[0] == "inject":
- if len(inp) != 4:
- print("Injects require 3 parameters!")
- continue
- params = [{"time": float(inp[1]), "port": inp[2], "event": inp[3]}]
- elif inp[0] == "trace":
- set_defaults(inp, [None])
- params = [inp[1]]
- elif inp[0] == "pause":
- params = []
- elif inp[0] == "exit" or inp[0] == "quit":
- break
- elif inp[0] == "add_breakpoint":
- if len(inp) < 5:
- print("Breakpoint removal requires 2 parameters!")
- continue
- # Merge together the final part again
- inp = [inp[0], inp[1], " ".join(inp[2:-2]).replace("\\n", "\n"), get_bool(inp[-2]), get_bool(inp[-1])]
- print("Generated parameters: " + str(inp))
- params = inp[1:]
- elif inp[0] == "del_breakpoint":
- if len(inp) < 2:
- print("Breakpoint removal requires 1 parameter!")
- continue
- params = [inp[1]]
- elif inp[0] == "enable_breakpoint":
- action = "toggle_breakpoint"
- params = [inp[1], True]
- elif inp[0] == "disable_breakpoint":
- action = "toggle_breakpoint"
- params = [inp[1], False]
- elif inp[0] == "reset":
- params = []
- elif inp[0] == "backwards_step":
- params = []
- elif inp[0] == "help":
- print("Supported operations:")
- print(" simulate [termination_time]")
- print(" --> Simulate until termination time is reached")
- print(" big_step [termination_time]")
- print(" --> Simulate a single step, unless termination time is reached")
- print(" small_step [termination_time]")
- print(" --> Simulate a single internal simulation step")
- print(" Termination time is ONLY checked at the")
- print(" check_termination phase")
- print(" backwards_step")
- print(" --> Step back to the previous time")
- print(" realtime [termination_time [realtime_scale]]")
- print(" --> Simulate in realtime until simulation time is reached")
- print(" realtime_scale can speed up or slow down the pace")
- print(" god_event model_name attribute_name new_value")
- print(" --> Modify the internal state of an arbitrary model")
- print(" model_name should be the fully qualified name")
- print(" attribute_name is the name of the attribute to alter")
- print(" new_value is the value to assign")
- print(" new_value can only be a string due to string-only input")
- print(" inject time port_name event")
- print(" --> Put a user-defined event on an input port")
- print(" port_name should be a fully qualified port name")
- print(" event should be the event to put on it, string only")
- print(" trace [filename]")
- print(" --> Write out trace information to the specified file.")
- print(" Leave empty to disable tracing.")
- print(" add_breakpoint id function enabled disable_on_trigger")
- print(" --> Add a breakpoint that will pause simulation when function returns True")
- print(" the function should contain a definition of the 'breakpoint' function")
- print(" and must take 3 parameters: time, model and transitioned")
- print(" enabled indicates whether or not the breakpoint should be checked")
- print(" disable_on_trigger determines if the breakpoint should auto-disable")
- print(" after being triggered")
- print(" del_breakpoint id")
- print(" --> Remove a breakpoint")
- print(" enable_breakpoint id")
- print(" --> Enable the provided breakpoint")
- print(" disable_breakpoint id")
- print(" --> Disable the provided breakpoint")
- print(" reset")
- print(" --> Reset the simulation")
- print(" exit")
- print(" --> Stop the client")
- print(" pause")
- print(" --> Pause the simulation")
- print(" quit")
- print(" --> Stop the client")
- print("")
- print("Defaults: ")
- print(" termination_time --> 'inf'")
- print(" realtime_scale --> 1.0")
- continue
- else:
- print("Command not understood: " + str(inp))
- continue
- controller.addInput(Event(action, "request", params))
-
- import threading
- poll_thrd = threading.Thread(target=poll_port,args=[reply_port])
- poll_thrd.daemon = True
- poll_thrd.start()
- inp_thrd = threading.Thread(target=generate_input)
- inp_thrd.daemon = True
- inp_thrd.start()
-
- try:
- controller.start()
- finally:
- controller.stop()
|