| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650 |
- import sys
- import time
- import threading
- from . import naivelog
- from .depGraph import createDepGraph
- from .solver import GaussianJordanLinearSolver
- from .realtime.threadingBackend import ThreadingBackend, Platform
- from .util import PYTHON_VERSION
- _TQDM_FOUND = True
- try:
- from tqdm import tqdm
- except ImportError:
- _TQDM_FOUND = False
- class Clock:
- """
- The clock of the simulation.
- Args:
- delta_t (float): Delay in-between timesteps in the simulation.
- time (float): The time to start the clock at.
- """
- def __init__(self, delta_t, time=0.0):
- self.__delta_t = delta_t
- self.__time = time
- self.__start_time = time
- def getTime(self):
- """
- Gets the current simulation time.
- """
- return self.__time
- def setTime(self, time):
- self.__time = time
- def getStartTime(self):
- """
- Gets the starting simulation time.
- """
- return self.__start_time
- def step(self):
- """
- Executes a timestep on the simulation.
- """
- self.__time = self.__time + self.__delta_t
- def _rewind(self):
- """
- Rewinds the simulation clock to the previous iteration, assuming the delta
- has not been changed.
- Danger:
- Normally, this function should only be used by the internal mechanisms
- of the CBD simulator, **not** by a user. Using this function without a
- full understanding of the simulator may result in undefined behaviour.
- """
- self.__time = self.__time - self.__delta_t
- def setDeltaT(self, new_delta_t):
- """
- Sets the delta in-between timesteps.
- Args:
- new_delta_t (float): The new delta.
- """
- self.__delta_t = new_delta_t
- def getDeltaT(self):
- """
- Obtains the current delta.
- """
- return self.__delta_t
- class Simulator:
- """
- Simulator for a CBD model. Allows for execution of the simulation.
- This class implements the semantics of CBDs.
- Args:
- model (CBD): A :class:`CBD` model to simulate.
- """
- def __init__(self, model):
- self.model = model
- self.__deltaT = 1.0
- self.__realtime = False
- self.__finished = True
- self.__stop_requested = False
- # scale of time in the simulation.
- self.__realtime_scale = 1.0
- # maximal amount of events with delay 0
- self.__realtime_counter_max = 100
- # current amount of events
- self.__realtime_counter = self.__realtime_counter_max
- # Starting time of the simulation
- self.__realtime_start_time = 0.0
- self.__termination_time = float('inf')
- self.__termination_condition = None
- # simulation data [dep graph, strong components, curIt]
- self.__sim_data = [None, None, 0]
- self.__stepsize_backend = Fixed(self.__deltaT)
- self.__deltas = []
- self.__threading_backend = None
- self.__threading_backend_subsystem = Platform.PYTHON
- self.__threading_backend_args = []
- self.__progress = False
- self.__progress_event = None
- self.__progress_finished = True
- self.__logger = naivelog.getLogger("CBD")
- self.__duration_log = []
- self.__lasttime = None
- self.__events = {
- "started": [],
- "finished": []
- }
-
- # TODO: make this variable, given more solver implementations
- self.__solver = GaussianJordanLinearSolver(self.__logger)
- def setBackend(self, back):
- self.__stepsize_backend = back
- def run(self, term_time=None, start_time=0.0):
- """
- Simulates the model.
- Args:
- term_time (float): When not :code:`None`, overwrites the
- termination time with the new value.
- start_time (float): The time at which to start the simulation.
- I.e. at the beginning, this amount of
- time has passed. Defaults to 0.
- """
- self.__finished = False
- self.__stop_requested = False
- self.model.setClock(Clock(self.getDeltaT(), start_time))
- if term_time is not None:
- self.__termination_time = term_time
- self.__sim_data = [None, None, 0]
- self.__duration_log = [] # for execution statistics
- self.__progress_finished = False
- if self.__threading_backend is None:
- # If there is still a backend, it is the same, so keep it!
- self.__threading_backend = ThreadingBackend(self.__threading_backend_subsystem,
- self.__threading_backend_args)
- if _TQDM_FOUND and self.__progress and self.__termination_time < float('inf'):
- # Setup progress bar if possible
- thread = threading.Thread(target=self.__progress_update)
- thread.daemon = True
- thread.start()
- if self.__realtime:
- self.__realtime_start_time = time.time() - start_time
- self.__lasttime = start_time
- self.signal("started")
- if self.__realtime:
- self.__threading_backend.wait(0.0, self.__runsim)
- else:
- self.__runsim()
- def __finish(self):
- """
- Terminate the simulation.
- """
- self.__finished = True
- if not self.__progress:
- # Whenever the progress bar is initialized, wait until it ends
- self.__progress_finished = True
- self.signal("finished")
- def __check(self):
- """
- Checks if the simulation still needs to continue.
- This is done based on the termination time and condition.
- Returns:
- :code:`True` if the simulation needs to be terminated and
- :code:`False` otherwise.
- """
- ret = self.__stop_requested
- if self.__termination_condition is not None:
- ret = self.__termination_condition(self.model, self.__sim_data[2])
- return ret or self.__termination_time <= self.getTime()
- def stop(self):
- """
- Requests a termination of the current running simulation.
- """
- self.__stop_requested = True
- def is_running(self):
- """
- Returns :code:`True` as long as the simulation is running.
- This is a convenience function to keep real-time simulations
- alive, or to interact from external sources.
- """
- return not self.__progress_finished and not self.__finished
- def getClock(self):
- """
- Gets the simulation clock.
- See Also:
- - :func:`getTime`
- - :func:`getRelativeTime`
- - :func:`getDeltaT`
- - :func:`setDeltaT`
- - :class:`Clock`
- """
- return self.model.getClock()
- def getTime(self):
- """
- Gets the current simulation time.
- See Also:
- - :func:`getClock`
- - :func:`getRelativeTime`
- - :func:`getDeltaT`
- - :func:`setDeltaT`
- - :class:`Clock`
- """
- return self.getClock().getTime()
- def getRelativeTime(self):
- """
- Gets the current simulation time, ignoring a starting offset.
- See Also:
- - :func:`getClock`
- - :func:`getTime`
- - :func:`getDeltaT`
- - :func:`setDeltaT`
- - :class:`Clock`
- """
- return self.getClock().getTime() - self.getClock().getStartTime()
- def setDeltaT(self, delta_t):
- """
- Sets the delta in-between iteration steps.
- Args:
- delta_t (float): The delta.
- See Also:
- - :func:`getClock`
- - :func:`getTime`
- - :func:`getRelativeTime`
- - :func:`getDeltaT`
- - :class:`Clock`
- """
- self.__deltaT = delta_t
- clock = self.getClock()
- if clock is not None:
- clock.setDeltaT(delta_t)
- self.__stepsize_backend.delta_t = delta_t
- def getDeltaT(self):
- """
- Gets the delta in-between iteration steps.
- See Also:
- - :func:`getClock`
- - :func:`getTime`
- - :func:`getRelativeTime`
- - :func:`setDeltaT`
- - :class:`Clock`
- """
- return self.__deltaT
- def setSimData(self, data):
- self.__sim_data = data
- def setRealTime(self, enabled=True, scale=1.0):
- """
- Makes the simulation run in (scaled) real time.
- Args:
- enabled (bool): When :code:`True`, realtime simulation will be enabled.
- Otherwise, it will be disabled. Defaults to :code:`True`.
- scale (float): Optional scaling for the simulation time. When greater
- than 1, the simulation will run slower than the actual
- time. When < 1, it will run faster.
- E.g. :code:`scale = 2.0` will run twice as long.
- Defaults to :code:`1.0`.
- """
- self.__realtime = enabled
- # Scale of 2 => twice as long
- self.__realtime_scale = scale
- def setProgressBar(self, enabled=True):
- """
- Use the `tqdm <https://tqdm.github.io/>`_ package to display a progress bar
- of the simulation.
- Args:
- enabled (bool): Whether or not to enable/disable the progress bar.
- Defaults to :code:`True` (= show progress bar).
- Raises:
- AssertionError: if the :code:`tqdm` module cannot be located.
- """
- assert _TQDM_FOUND, "Module tqdm not found. Progressbar is not possible."
- self.__progress = enabled
- def setTerminationCondition(self, func):
- """
- Sets the system's termination condition.
- Args:
- func: A function that takes the model and the current iteration as input
- and produces :code:`True` if the simulation needs to terminate.
- Note:
- When set, the progress bars (see :func:`setProgressBar`) may not work as intended.
- See Also:
- :func:`setTerminationTime`
- """
- # TODO: allow termination condition to set progressbar update value?
- self.__termination_condition = func
- def setTerminationTime(self, term_time):
- """
- Sets the termination time of the system.
- Args:
- term_time (float): Termination time for the simulation.
- """
- self.__termination_time = term_time
- def setRealTimePlatform(self, subsystem, *args):
- """
- Sets the realtime platform to a platform of choice.
- This allows more complex/efficient simulations.
- Calling this function automatically sets the simulation to realtime.
- Args:
- subsystem (Platform): The platform to use.
- args: Optional arguments for this platform.
- Currently, only the TkInter platform
- makes use of these arguments.
- Note:
- To prevent misuse of the function, please use one of the wrapper
- functions when you have no idea what you're doing.
- See Also:
- - :func:`setRealTimePlatformThreading`
- - :func:`setRealTimePlatformTk`
- - :func:`setRealTimePlatformGameLoop`
- """
- self.setRealTime(True)
- self.__threading_backend = None
- self.__threading_backend_subsystem = subsystem
- self.__threading_backend_args = args
- def setRealTimePlatformThreading(self):
- """
- Wrapper around the :func:`setRealTimePlatform` call to automatically
- set the Python Threading backend.
- Calling this function automatically sets the simulation to realtime.
- See Also:
- - :func:`setRealTimePlatform`
- - :func:`setRealTimePlatformTk`
- - :func:`setRealTimePlatformGameLoop`
- """
- self.setRealTimePlatform(Platform.THREADING)
- def setRealTimePlatformGameLoop(self):
- """
- Wrapper around the :func:`setRealTimePlatform` call to automatically
- set the Game Loop backend. Using this backend, it is expected the user
- will periodically call the :func:`realtime_gameloop_call` method to
- update the simulation step. Timing is still maintained internally.
- Calling this function automatically sets the simulation to realtime.
- See Also:
- - :func:`setRealTimePlatform`
- - :func:`setRealTimePlatformThreading`
- - :func:`setRealTimePlatformTk`
- - :func:`realtime_gameloop_call`
- - :doc:`examples/RealTime`
- """
- self.setRealTimePlatform(Platform.GAMELOOP)
- def setRealTimePlatformTk(self, root):
- """
- Wrapper around the :func:`setRealTimePlatform` call to automatically
- set the TkInter backend.
- Calling this function automatically sets the simulation to realtime.
- Args:
- root: TkInter root window object (tkinter.Tk)
- See Also:
- - :func:`setRealTimePlatform`
- - :func:`setRealTimePlatformThreading`
- - :func:`setRealTimePlatformGameLoop`
- """
- self.setRealTimePlatform(Platform.TKINTER, root)
- def realtime_gameloop_call(self, time=None):
- """
- Do a step in the realtime-gameloop platform.
- Args:
- time (float): Simulation time to be passed on. Only to be used
- for the alternative gameloop backend.
- Note:
- This function will only work for a :attr:`Platform.GAMELOOP` or a
- :attr:`Platform.GLA` simulation, after the :func:`run` method has
- been called.
- See Also:
- - :func:`setRealTimePlatform`
- - :func:`setRealTimePlatformGameLoop`
- - :func:`run`
- """
- self.__threading_backend.step(time)
- def do_single_step(self):
- curIt = self.__sim_data[2]
- # Efficiency reasons: dep graph only changes at these times
- # in the given set of library blocks.
- # TODO: Must be set to "every time" instead.
- if curIt < 2 or self.__sim_data[0] is None:
- self.__sim_data[0] = createDepGraph(self.model, curIt)
- self.__sim_data[1] = self.__sim_data[0].getStrongComponents(curIt)
- self.__computeBlocks(self.__sim_data[1], self.__sim_data[0], self.__sim_data[2])
- self.__sim_data[2] += 1
- def update_clock(self):
- if self.__sim_data[2] > 0:
- new_dt = self.__stepsize_backend.getNextStepSize(self)
- self.setDeltaT(new_dt)
- self.__deltas.append(new_dt)
- self.getClock().step()
- def step_back(self):
- """
- Rewinds the simulator to the previous iteration.
- Danger:
- Normally, this function should only be used by the internal mechanisms
- of the CBD simulator, **not** by a user. Using this function without a
- full understanding of the simulator may result in undefined behaviour.
- """
- self.getClock()._rewind()
- self.model._rewind()
- self.__sim_data[2] -= 1
- def getDurationLog(self):
- """
- Get the list of timings for every iteration.
- Warning:
- This function is temporary and will be removed in the future.
- """
- return self.__duration_log
- def getDeltaLog(self):
- return self.__deltas
- def __realtimeWait(self):
- """
- Wait until next realtime event.
- Returns:
- :code:`True` if a simulation stop is required and
- :code:`False` otherwise.
- """
- current_time = time.time() - self.__realtime_start_time
- next_sim_time = min(self.__termination_time, self.__lasttime + self.getDeltaT())
- # Scaled Time
- next_sim_time *= self.__realtime_scale
- # Subtract the time that we already did our computation
- wait_time = next_sim_time - current_time
- self.__lasttime = next_sim_time / self.__realtime_scale
- if wait_time <= 0.0:
- # event is overdue => force execute
- self.__realtime_counter -= 1
- if self.__realtime_counter < 0:
- # Too many overdue events at a time
- self.__realtime_counter = self.__realtime_counter_max
- self.__threading_backend.wait(0.01, self.__runsim)
- return True
- return False
- self.__realtime_counter = self.__realtime_counter_max
- self.__threading_backend.wait(wait_time, self.__runsim)
- return True
- def __runsim(self):
- """
- Do the actual simulation.
- """
- self.__realtime_counter = self.__realtime_counter_max
- while True:
- if self.__check():
- self.__finish()
- break
- self.update_clock()
- before = time.time()
- self.do_single_step()
- self.__duration_log.append(time.time() - before)
- if self.__threading_backend_subsystem == Platform.GLA:
- self.__threading_backend.wait(self.getDeltaT(), self.__runsim)
- break
- if self.__realtime and self.__realtimeWait():
- # Next event has been scheduled, kill this process
- break
- def __computeBlocks(self, sortedGraph, depGraph, curIteration):
- """
- Compute the new state of the model.
- Args:
- sortedGraph: The set of strong components.
- depGraph: A dependency graph.
- curIteration (int): Current simulation iteration.
- """
- for component in sortedGraph:
- if not self.__hasCycle(component, depGraph):
- block = component[0] # the strongly connected component has a single element
- block.compute(curIteration)
- else:
- # Detected a strongly connected component
- self.__solver.checkValidity(self.model.getPath(), component)
- solverInput = self.__solver.constructInput(component, curIteration)
- self.__solver.solve(solverInput)
- solutionVector = solverInput[1]
- for block in component:
- blockIndex = component.index(block)
- block.appendToSignal(solutionVector[blockIndex])
- def __hasCycle(self, component, depGraph):
- """
- Determine whether a component is cyclic or not.
- Args:
- component (list): The set of strong components.
- depGraph: The dependency graph.
- """
- assert len(component) >= 1, "A component should have at least one element"
- if len(component) > 1:
- return True
- else: # a strong component of size one may still have a cycle: a self-loop
- if depGraph.hasDependency(component[0], component[0]):
- return True
- else:
- return False
- def __progress_update(self):
- """
- Updates the progress bar.
- """
- assert _TQDM_FOUND, "Module tqdm not found. Progressbar is not possible."
- end = self.__termination_time
- pbar = tqdm(total=end, bar_format='{desc}: {percentage:3.0f}%|{bar}| {n:.2f}/{total_fmt} '
- '[{elapsed}/{remaining}, {rate_fmt}{postfix}]')
- last = 0.0
- while not self.__finished:
- now = self.getTime()
- # print(end, now, last)
- pbar.update(min(now, end) - last)
- last = now
- time.sleep(0.5) # Only update every half a second
- if last < end:
- pbar.update(end - last)
- pbar.close()
- # TODO: prints immediately after break pbar...
- self.__progress_finished = True
- def connect(self, name, function):
- """
- Connect an event with an additional function.
- The functions will be called in the order they were connected to the
- events, with the associated arguments. The accepted signals are:
- - :code:`started`: Raised whenever the simulation setup has completed,
- but before the actual simulation begins.
- - :code:`finished`: Raised whenever the simulation finishes.
- Args:
- name (str): The name of the signal to raise.
- function: A function that will be called with the optional arguments
- whenever the event is raised.
- """
- if name not in self.__events:
- raise ValueError("Invalid signal '%s' in Simulator." % name)
- self.__events[name].append(function)
- def signal(self, name, *args):
- """
- Raise a signal with a specific name and arguments.
- The accepted signals are:
- - :code:`started`: Raised whenever the simulation setup has completed,
- but before the actual simulation begins.
- - :code:`finished`: Raised whenever the simulation finishes.
- Note:
- Normally, users do not need to call this function.
- Args:
- name (str): The name of the signal to raise.
- *args: Additional arguments to pass to the connected events.
- See Also:
- :func:`connect`
- """
- if name not in self.__events:
- raise ValueError("Invalid signal '%s' in Simulator." % name)
- for evt in self.__events[name]:
- evt(*args)
- from .stepsize import *
|