simulator.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826
  1. # Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
  2. # McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """
  16. Main simulator class to be used as an interface to the user
  17. """
  18. import pypdevs.middleware as middleware
  19. # Fetch the rank and size of this simulation run
  20. # Don't get it ourself (e.g. from MPI), as we are possibly not using MPI
  21. nested = False
  22. was_main = False
  23. rank, size = middleware.startupMiddleware()
  24. from pypdevs.util import *
  25. from pypdevs.DEVS import *
  26. from pypdevs.basesimulator import *
  27. from pypdevs.server import *
  28. from pypdevs.logger import *
  29. import threading
  30. import pypdevs.accurate_time as time
  31. import os
  32. # Try loading cPickle as it is faster, though otherwise fall back to normal pickling
  33. try:
  34. import cPickle as pickle
  35. except ImportError:
  36. import pickle
  37. # For Python2 and Python3 compliance
  38. try:
  39. import Queue as Queue
  40. except ImportError:
  41. # Fix for parallel DEVS?
  42. # from multiprocessing import Queue
  43. import queue as Queue
  44. def local(sim):
  45. """
  46. Test whether or not the simulation is done locally
  47. :param sim: the simulator with the locations
  48. :returns: bool -- whether or not the simulation is local
  49. """
  50. if len(sim.locations) == 0:
  51. raise DEVSException("There are no Atomic DEVS models present in your provided model")
  52. return sim.server.size == 1
  53. def loadCheckpoint(name):
  54. """
  55. Load a previously created simulation from a saved checkpoint.
  56. :param name: the name of the model to provide some distinction between different simulation models
  57. :returns: either None if no recovery was possible, or the Simulator object after simulation
  58. """
  59. listdir = os.listdir('.')
  60. if str(name) + "_SIM.pdc" not in listdir:
  61. # Simulator object not even found, don't bother continuing
  62. #assert info("Not even a SIM file was found for the requested name, giving up already")
  63. return
  64. try:
  65. infile = open("%s_SIM.pdc" % (name), 'r')
  66. simulator = pickle.load(infile)
  67. infile.close()
  68. except:
  69. return
  70. # Use an rsplit, as it is possible that the user defined name contains _ too
  71. files = [f for f in listdir
  72. if (f.endswith(".pdc") and
  73. not f.endswith("SIM.pdc") and
  74. f.rsplit('_', 2)[0] == name)]
  75. if len(files) == 0:
  76. return
  77. #assert debug("Got matching files: " + str(files))
  78. max_gvt = 0
  79. nodes = middleware.COMM_WORLD.Get_size()
  80. noncomplete_checkpoint = True
  81. found_files = {}
  82. found_gvts = []
  83. for f in files:
  84. gvt = float(f.split('_')[-2])
  85. if gvt in found_files:
  86. found_files[gvt].append(f)
  87. else:
  88. found_files[gvt] = [f]
  89. found_gvts.append(gvt)
  90. found_gvts.sort()
  91. gvt = 0
  92. # Construct a temporary server
  93. from pypdevs.middleware import COMM_WORLD
  94. server = Server(middleware.COMM_WORLD.Get_rank(),
  95. middleware.COMM_WORLD.Get_size())
  96. while len(found_gvts) > 0:
  97. gvt = found_gvts[-1]
  98. if len(found_files[gvt]) < nodes:
  99. found_gvts.pop()
  100. gvt = 0
  101. else:
  102. if gvt == 0:
  103. return None
  104. for rank in range(server.size):
  105. if not server.getProxy(rank).checkLoadCheckpoint(name, gvt):
  106. # One of the proxies denied, try next
  107. found_gvts.pop()
  108. gvt = 0
  109. break
  110. if gvt != 0:
  111. # If we got here, we are done and can load it
  112. break
  113. if gvt == 0:
  114. #raise DEVSException("All currently present pdc files are unusable, please remove them all to force a fresh simulation run!")
  115. if COMM_WORLD.Get_size() > 1:
  116. # We need to shut down the already started MPI server...
  117. COMM_WORLD.isend(0, dest=0, tag=0)
  118. return None
  119. simulator.server = server
  120. for rank in range(server.size):
  121. server.getProxy(rank).loadCheckpoint(name, gvt)
  122. #assert info("Recovering from time " + str(gvt))
  123. simulator.loadCheckpoint()
  124. return simulator
  125. class Simulator(object):
  126. """
  127. Associates a hierarchical DEVS model with the simulation engine.
  128. """
  129. def __init__(self, model):
  130. """
  131. Constructor of the simulator.
  132. :param model: a valid model (created with the provided functions)
  133. """
  134. from pypdevs.simconfig import SimulatorConfiguration
  135. self.config = SimulatorConfiguration(self)
  136. from pypdevs.middleware import COMM_WORLD
  137. # Simulator is always started at the controller
  138. self.server = Server(0, size)
  139. self.model = model
  140. self.listeners = {}
  141. global nested
  142. global was_main
  143. if nested:
  144. was_main = False
  145. else:
  146. nested = True
  147. was_main = True
  148. # Initialize all options
  149. self.init_done = False
  150. self.run_gvt = True
  151. self.callbacks = []
  152. self.termination_models = set()
  153. self.fetch_all = False
  154. self.tracers = []
  155. self.cell = False
  156. self.x_size = None
  157. self.y_size = None
  158. self.cell_file = "celldevstrace.txt"
  159. self.cell_multifile = False
  160. self.termination_time = float('inf')
  161. self.termination_condition = None
  162. self.address = ('localhost', 514)
  163. import logging
  164. self.loglevel = logging.DEBUG
  165. self.checkpoint_interval = -1
  166. self.checkpoint_name = "(none)"
  167. self.gvt_interval = 1
  168. self.state_saving = 2
  169. self.msg_copy = 0
  170. self.realtime = False
  171. self.realtime_port_references = {}
  172. self.subsystem = "python"
  173. self.generator_file = None
  174. self.relocations = []
  175. self.progress = False
  176. self.draw_model = False
  177. self.hide_edge_labels = False
  178. self.setup = False
  179. self.allow_local_reinit = False
  180. self.modify_values = {}
  181. self.modify_state_values = {}
  182. self.activity_tracking = False
  183. self.activity_visualisation = False
  184. self.location_cell_view = False
  185. self.sort_on_activity = False
  186. from pypdevs.relocators.manualRelocator import ManualRelocator
  187. self.activity_relocator = ManualRelocator()
  188. self.dsdevs = False
  189. self.memoization = False
  190. self.classicDEVS = False
  191. self.setSchedulerActivityHeap()
  192. self.locations_file = None
  193. self.allocator = None
  194. self.accept_external_input = False
  195. self.realtime_extra = []
  196. self.model_ids = []
  197. self.locations = defaultdict(list)
  198. self.model.finalize(name="",
  199. model_counter=0,
  200. model_ids=self.model_ids,
  201. locations=self.locations,
  202. select_hierarchy=[])
  203. # Allow the model to provide some of its own configuration
  204. self.model.simSettings(self)
  205. def __getattr__(self, func):
  206. """
  207. Get the specified attribute, all setters/registers will be redirected to the configuration code.
  208. :param func: function that is called
  209. :returns: requested attribute
  210. """
  211. if func.startswith("set") or func.startswith("register"):
  212. # Redirect to configuration backend
  213. return getattr(self.config, func)
  214. else:
  215. # Just an unknown attribute
  216. raise AttributeError(func)
  217. def runStartup(self):
  218. """
  219. Perform startup of the simulator right before simulation
  220. """
  221. self.startAllocator()
  222. # Controller is the main simulation server
  223. # Remap every mandatory model to the controller
  224. for model_id in self.termination_models:
  225. model = self.model_ids[model_id]
  226. self.locations[model.location].remove(model_id)
  227. self.locations[0].append(model_id)
  228. model.location = 0
  229. model.relocatable = False
  230. # Code without location goes to the controller
  231. # Delay this code as auto allocation could be possible
  232. for model_id in self.locations[None]:
  233. # All these models are not yet initialized, so set them to the default
  234. self.model_ids[model_id].location = 0
  235. self.locations[0].extend(self.locations[None])
  236. del self.locations[None]
  237. self.controller = self.server.getProxy(0)
  238. if self.draw_model:
  239. #assert info("Drawing model hierarchy")
  240. out = open(self.draw_model_file, 'w')
  241. out.write("digraph G {\n")
  242. self.drawModelHierarchy(out, self.model)
  243. self.model.listeners = self.listeners
  244. if isinstance(self.model, CoupledDEVS):
  245. self.model.component_set = directConnect(self.model.component_set, self.listeners)
  246. elif isinstance(self.model, AtomicDEVS):
  247. for p in self.model.IPorts:
  248. p.routing_inline = []
  249. p.routing_outline = []
  250. for p in self.model.OPorts:
  251. p.routing_inline = []
  252. p.routing_outline = []
  253. else:
  254. raise DEVSException("Unkown model being simulated")
  255. if self.allocator is not None and self.allocator.getTerminationTime() == 0.0:
  256. # It is a static allocator, so this can be done right now!
  257. allocs = self.allocator.allocate(self.model.component_set,
  258. None,
  259. self.server.size,
  260. None)
  261. for model_id, location in allocs.items():
  262. self.model_ids[model_id].location = location
  263. saveLocations("locationsave.txt", allocs, self.model_ids)
  264. self.allocator = None
  265. if self.draw_model and self.allocator is None:
  266. out = open(self.draw_model_file, 'a')
  267. self.drawModelConnections(out, self.model, None)
  268. out.write("}")
  269. self.draw_model = False
  270. nodes = len(self.locations.keys())
  271. if None in self.locations:
  272. nodes -= 1
  273. if nodes < self.server.size:
  274. # Less locations used than there exist
  275. #assert warn("Not all initialized MPI nodes are being used in the model setup! Auto allocation could fix this.")
  276. pass
  277. elif nodes > self.server.size:
  278. raise DEVSException(
  279. "Not enough MPI nodes started for the distribution given " +
  280. "in the model! Models requested at location %i, max node = %i"
  281. % (max(self.locations.keys()), self.server.size - 1))
  282. def drawModelHierarchy(self, outfile, model):
  283. """
  284. Draws the hierarchy by creating a Graphviz Dot file. This merely creates the first part: the hierarchy of all models
  285. :param outfile: a file handle to write text to
  286. :param model: the model to draw
  287. """
  288. from pypdevs.colors import colors
  289. if isinstance(model, CoupledDEVS):
  290. outfile.write(' subgraph "cluster%s" {\n' % (model.getModelFullName()))
  291. outfile.write(' label = "%s"\n' % model.getModelName())
  292. outfile.write(' color=black\n')
  293. for m in model.component_set:
  294. self.drawModelHierarchy(outfile, m)
  295. outfile.write(' }\n')
  296. elif isinstance(model, AtomicDEVS):
  297. if model.location >= len(colors):
  298. #assert warn("Not enough colors supplied in colors.py for this number of nodes! Defaulting to white")
  299. color = "white"
  300. else:
  301. color = colors[model.location]
  302. outfile.write((' "%s" [\n label = "%s\\nState: %s"\n' +
  303. ' color="%s"\n style=filled\n]\n')
  304. % (model.getModelFullName(),
  305. model.getModelName(),
  306. model.state,
  307. color))
  308. def drawModelConnections(self, outfile, model, colors=None):
  309. """
  310. Draws all necessary connections between the model
  311. :param outfile: the file to output to
  312. :param model: a CoupledDEVS model whose children should be drawn
  313. :param colors: the colors to draw on the connections. Only used when an initial allocator is used.
  314. """
  315. if colors is not None:
  316. max_events = 0
  317. for i in colors:
  318. for j in colors[i]:
  319. if colors[i][j] > max_events:
  320. max_events = colors[i][j]
  321. for source in model.component_set:
  322. for source_port in source.OPorts:
  323. for destination_port, _ in source_port.routing_outline:
  324. destination = destination_port.host_DEVS
  325. if colors is not None:
  326. #TODO color is not yet perfect
  327. try:
  328. absolute_color = colors[source][destination]
  329. relative_color = '"%s 1 1"' \
  330. % (1 / (absolute_color / float(3 * max_events)))
  331. except KeyError:
  332. # Simply no message transfer
  333. absolute_color = 0
  334. relative_color = '"1 1 1"'
  335. outfile.write(' "%s" -> "%s" '
  336. % (source.getModelFullName(),
  337. destination.getModelFullName()))
  338. if self.hide_edge_labels and colors is None:
  339. outfile.write(';\n')
  340. elif self.hide_edge_labels and colors is not None:
  341. outfile.write('[label="%s",color=%s];\n'
  342. % (absolute_color, relative_color))
  343. elif not self.hide_edge_labels and colors is None:
  344. outfile.write('[label="%s -> %s"];\n'
  345. % (source_port.getPortName(),
  346. destination_port.getPortName()))
  347. elif not self.hide_edge_labels and colors is not None:
  348. outfile.write('[label="%s -> %s (%s)",color=%s];\n'
  349. % (source_port.getPortName(),
  350. destination_port.getPortName(),
  351. absolute_color,
  352. relative_color))
  353. def checkpoint(self):
  354. """
  355. Create a checkpoint of this object
  356. """
  357. outfile = open(str(self.checkpoint_name) + "_SIM.pdc", 'w')
  358. if self.flattened:
  359. self.model.flattenConnections()
  360. pickle.dump(self, outfile)
  361. if self.flattened:
  362. self.model.unflattenConnections()
  363. def loadCheckpoint(self):
  364. """
  365. Alert the Simulator that it was restored from a checkpoint and thus can take some shortcuts
  366. """
  367. self.controller = self.server.getProxy(0)
  368. self.real_simulate()
  369. def startAllocator(self):
  370. """
  371. Set the use of an allocator if required, thus forcing all models to run at the controller
  372. """
  373. if self.allocator is not None:
  374. self.activity_tracking = True
  375. # Make simulation local for event capturing
  376. for model in self.model.component_set:
  377. model.setLocation(0, force=True)
  378. def loadLocationsFromFile(self, filename):
  379. """
  380. Try to load a file containing the allocation of the nodes. If such a (valid) file is found, True is returned. Otherwise False is returned.
  381. This can thus easily be used in a simulator experiment file as a condition for setting an allocator (e.g. check for an allocation file and if
  382. none is found, create one by running the allocator first).
  383. A partially valid file will not be used; a file does not need to specify an allocation for all models, those that aren't mentioned are simply
  384. skipped and their default allocation is used (as specified in the model itself).
  385. :param filename: the name of the file to use
  386. :returns: bool -- success of the operation
  387. """
  388. try:
  389. f = open(filename, 'r')
  390. locs = {}
  391. for line in f:
  392. split = line.split(" ", 2)
  393. model_id = int(split[0])
  394. location = int(split[1])
  395. modelname = modelname[:-1]
  396. # Check for compliance first, otherwise the locations are loaded partially
  397. if self.model_ids[model_id].getModelFullName() != modelname:
  398. return False
  399. else:
  400. locs[model_id] = location
  401. f.close()
  402. # Everything seems to be fine, so do the actual allocations now
  403. for model_id in locs:
  404. self.model_ids[model_id].location = locs[model_id]
  405. return True
  406. except:
  407. return False
  408. def reinit(self):
  409. """
  410. Reinitialize the model, so that a new *simulate()* call will restart the simulation anew.
  411. This is possible in both local and distributed simulation,
  412. though it requires the *setAllowLocalReinit* method to be called first if you are running local simulation.
  413. The additional method call is required as allowing reinitialisation requires the complete model to be saved twice (a reinit copy and a working copy).
  414. **Do NOT call this method directly, but call it through the simconfig file**
  415. """
  416. loclist = range(self.server.size)
  417. proxylist = [self.server.getProxy(location) for location in loclist]
  418. # Send to very model to clear the simulation memory
  419. if not self.allow_local_reinit and len(proxylist) == 1:
  420. raise DEVSException("Reinitialisation for local simulation is disabled by default, please enable it with the configuration method 'setAllowLocalReinit()'")
  421. for i, proxy in enumerate(proxylist):
  422. proxy.resetSimulation(self.scheduler_locations[i])
  423. def modifyState(self, model_id, state):
  424. """
  425. Modify the state of a specific model.
  426. **Do NOT call this method directly, but call it through the simconfig interface**
  427. :param model_id: the model_id of the model to modify the state from
  428. :param state: the state to configure
  429. """
  430. proxy = self.server.getProxy(self.model_ids[model_id].location)
  431. proxy.setAttr(model_id, "state", state)
  432. self.controller.stateChange(model_id, "model.state", state)
  433. def modifyStateAttr(self, model_id, attr, value):
  434. """
  435. Modify an attribute of the state of a specific model.
  436. **Do NOT call this method directly, but call it through the simconfig interface**
  437. :param model_id: the model_id of the model to modify the state from
  438. :param attr: the name of the attribute of the state to modify
  439. :param value: the value to set as attribute
  440. """
  441. proxy = self.server.getProxy(self.model_ids[model_id].location)
  442. proxy.setStateAttr(model_id, attr, value)
  443. self.controller.stateChange(model_id, "model.state.%s" % attr, value)
  444. def modifyAttributes(self, model_id, attr, value):
  445. """
  446. Modify an attribute of a specific model.
  447. **Do NOT call this method directly, but call it through the simconfig interface**
  448. :param model_id: the model_id of the model to modify the state from
  449. :param attr: the name of the attribute of the model to modify
  450. :param value: the value to set as attribute
  451. """
  452. for dst in range(self.server.size):
  453. self.server.getProxy(dst).setAttr(model_id, attr, value)
  454. self.controller.stateChange(model_id, "model.%s" % attr, value)
  455. def simulate(self):
  456. """
  457. Start simulation with the previously set options. Can be reran afterwards to continue the simulation of
  458. the model (or reinit it first and restart simulation), possibly after altering some aspects of the model with the provided methods.
  459. """
  460. loclist = range(self.server.size)
  461. proxylist = [self.server.getProxy(location) for location in loclist]
  462. if not self.setup:
  463. self.runStartup()
  464. self.relocations.sort()
  465. for directive in self.relocations:
  466. if directive[1] in self.termination_models:
  467. raise DEVSException("Termination model was found as a relocation directive!")
  468. # self.locations is now untrusted, as it is possible for migration to happen!
  469. self.locations = defaultdict(list)
  470. # Broadcast the model, do this slightly more intelligent than by iterating over the list by using provided functions and exploiting maximal parallellism
  471. self.flattened = False
  472. # Fill in all schedulers
  473. for location in loclist:
  474. if location not in self.scheduler_locations:
  475. self.scheduler_locations[location] = self.scheduler_type
  476. try:
  477. # Try broadcasting as-is
  478. broadcastModel((self.model,
  479. self.model_ids,
  480. self.flattened),
  481. proxylist,
  482. self.allow_local_reinit,
  483. self.scheduler_locations)
  484. self.flattened = False
  485. except RuntimeError:
  486. # Something went wrong, probably exceeded the maximum recursion depth while pickling
  487. #assert warn("Normal sending not possible due to big recursion, trying auto flattening")
  488. try:
  489. # Try decoupling the ports from the actual models to limit recursion that our simulation framework induced
  490. self.model.flattenConnections()
  491. # Broadcast again, but now mention that the ports were altered
  492. self.flattened = True
  493. broadcastModel((self.model,
  494. self.model_ids,
  495. self.flattened),
  496. proxylist,
  497. self.allow_local_reinit,
  498. self.scheduler_locations)
  499. except RuntimeError as e:
  500. # Even that didn't solve it, user error!
  501. # Stop the nodes from waiting for a broadcast
  502. broadcastCancel()
  503. import sys
  504. raise DEVSException("Could not send model to remote destination due to pickling error: " + str(e))
  505. # Prevent further setups
  506. self.setup = True
  507. for proxy in proxylist:
  508. proxy.setGlobals(tracers=self.tracers,
  509. address=self.address,
  510. loglevel=self.loglevel,
  511. checkpoint_frequency=self.checkpoint_interval,
  512. checkpoint_name = self.checkpoint_name,
  513. kernels=len(loclist),
  514. statesaver=self.state_saving,
  515. memoization=self.memoization,
  516. msg_copy=self.msg_copy)
  517. # Set the verbosity on the controller only, otherwise each kernel
  518. # would open the file itself, causing problems. Furthermore, all
  519. # verbose output will be sent to the controller
  520. self.controller.setAllocator(self.allocator)
  521. self.controller.setRelocator(self.activity_relocator)
  522. self.controller.setDSDEVS(self.dsdevs)
  523. self.controller.setActivityTracking(self.activity_tracking)
  524. self.controller.setClassicDEVS(self.classicDEVS)
  525. self.controller.setAcceptExternalInputs(self.accept_external_input)
  526. self.controller.setCellLocationTracer(self.x_size,
  527. self.y_size,
  528. self.location_cell_view)
  529. # Clear this up as we would reregister them otherwise
  530. self.tracers = []
  531. if self.realtime:
  532. if len(loclist) > 1:
  533. raise DEVSException("Real time simulation only possible locally")
  534. self.controller.setRealTime(self.subsystem,
  535. self.generator_file,
  536. self.realtime_port_references,
  537. self.realtime_scale,
  538. self.listeners,
  539. self.realtime_extra)
  540. # Check whether global or local termination should be used
  541. if self.termination_condition is not None:
  542. # Only set the condition on the controller
  543. proxy = self.server.getProxy(0)
  544. proxy.setTerminationCondition(self.termination_condition)
  545. else:
  546. # Global termination time
  547. for proxy in proxylist:
  548. proxy.setTerminationTime((self.termination_time, float('inf')))
  549. if self.checkpoint_interval > 0:
  550. self.checkpoint()
  551. self.real_simulate()
  552. def removeTracers(self):
  553. """
  554. Remove all currently registered tracers.
  555. """
  556. loclist = range(self.server.size)
  557. proxylist = [self.server.getProxy(location) for location in loclist]
  558. for proxy in proxylist:
  559. proxy.removeTracers()
  560. def realtime_loop_call(self):
  561. """
  562. Perform a single iteration in the loop for real time simulation
  563. """
  564. self.controller.gameLoop()
  565. def realtime_interrupt(self, string):
  566. """
  567. Generate an interrupt for the realtime backend using a method call.
  568. :param string: the value to interrupt with, should be of the form "port value"
  569. """
  570. self.controller.realtimeInterrupt(string)
  571. def showProgress(self, locations):
  572. """
  573. Shows the progress of the simulation by polling all locations that are passed. Should run on a seperate thread as this blocks!
  574. :param locations: list of all locations to access
  575. """
  576. # 80 is somewhat default...
  577. consolewidth = 80
  578. # Delete 4 for the prefix, 5 for the suffix
  579. barwidth = consolewidth - 4 - 5
  580. finishtime = self.termination_time
  581. first = True
  582. # Local simulation is always 'committed'
  583. self.fillchar = "=" if len(locations) > 1 else "#"
  584. gvt = 0.0
  585. while 1:
  586. # Several dirty checks for whether or not the simulation is done, if it is finished no more calls should be needed
  587. # Keep doing this until the main thread exits, this should be a thread!
  588. if self.checkpoint_interval > -1:
  589. # Don't use an event while checkpointing, as this is unpicklable
  590. time.sleep(1)
  591. else:
  592. self.progress_event.wait(1)
  593. if not first:
  594. for _ in locations:
  595. sys.stdout.write("\033[F")
  596. first = False
  597. if self.progress_finished and self.fillchar != "E":
  598. gvt = finishtime
  599. elif self.progress_finished and self.fillchar == "E":
  600. # Don't update the GVT variable
  601. if len(locations) == 1:
  602. # The gvt is actually kind of the node time
  603. gvt = nodetime
  604. else:
  605. gvt = max(self.controller.getGVT(), 0)
  606. gvt_percentage = int(gvt / finishtime * 100)
  607. gvt_length = int(min(barwidth, gvt_percentage * barwidth / 100))
  608. for node in locations:
  609. if self.progress_finished:
  610. nodetime = float('inf')
  611. else:
  612. nodetime = self.controller.getProxy(node).getTime()
  613. if nodetime == float('inf'):
  614. nodetime = finishtime
  615. s = "%2d" % node
  616. s += " |"
  617. percentage = int(nodetime / finishtime * 100)
  618. s += "#" * gvt_length
  619. length = int(min(barwidth, percentage * barwidth / 100) - gvt_length)
  620. s += self.fillchar * length
  621. s += " " * int(barwidth - gvt_length - length)
  622. if percentage == 100 and self.fillchar != "E":
  623. s += "|DONE"
  624. elif percentage == 100 and self.fillchar == "E":
  625. s += "|FAIL"
  626. else:
  627. s += "| %2d" % percentage + "%"
  628. print("\r" + s)
  629. if self.progress_finished:
  630. return
  631. def real_simulate(self):
  632. """
  633. The actual simulation part, this is identical for the 'start from scratch' and 'start from checkpoint' algorithm, thus it was split up
  634. """
  635. locations = range(self.server.size)
  636. thread = threading.Thread(target=self.showProgress, args=[locations])
  637. try:
  638. ## Progress visualisation code
  639. if self.progress:
  640. if self.termination_time == float('inf'):
  641. #assert warning("Progress visualisation is only possible if a termination time is used instead of a termination condition")
  642. self.progress = False
  643. #elif self.verbose and self.verbose_file is None:
  644. # #assert warning("Progress visualisation is not allowed when printing verbose output")
  645. # pass
  646. # self.progress = False
  647. else:
  648. self.progress_finished = False
  649. # thread = threading.Thread(target=self.show_progress,
  650. # args=[locations])
  651. if self.checkpoint_interval < 0:
  652. self.progress_event = threading.Event()
  653. thread.start()
  654. # Local simulation can take a shortcut
  655. if len(locations) == 1:
  656. if self.checkpoint_interval > 0:
  657. # If we use checkpointing, we will need a GVT thread running
  658. self.controller.startGVTThread(self.gvt_interval)
  659. # Simply do a blocking call, thus preventing the finish ring algorithm
  660. #begin = time.time()
  661. self.controller.getProxy(locations[0]).simulate_sync()
  662. #print(time.time() - begin)
  663. else:
  664. self.controller.startGVTThread(self.gvt_interval)
  665. for location in locations:
  666. # Don't run all of these on a seperate thread, as it returns no result
  667. self.controller.getProxy(location).simulate()
  668. # Here, the simulation is running and we wait for it to end...
  669. self.controller.waitFinish(len(locations))
  670. # It seems that all nodes have finished!
  671. #assert debug("Finished waiting for all processors")
  672. except DEVSException as e:
  673. print(e)
  674. # Should also exit on a DEVSException since this isn't really meant to happen
  675. import sys
  676. # Return an errorcode, as we ended abruptly
  677. sys.exit(1)
  678. except:
  679. # Try to stop the progress bar thread if this exists, otherwise we hang forever
  680. if self.progress:
  681. self.fillchar = "E"
  682. self.progress_finished = True
  683. if self.checkpoint_interval > -1:
  684. # With checkpointing running, we need to do this the hard way...
  685. self.progress_event.set()
  686. # Wait for it to end
  687. thread.join()
  688. # Reraise the initial exception, this code was only here to stop the progress bar :)
  689. raise
  690. # Perform all pending operations
  691. #assert debug("Performing all delayed actions")
  692. self.controller.performActions()
  693. # Stop all running tracers
  694. #assert debug("Stopping all tracers")
  695. self.controller.stopTracers()
  696. # Now the state is stable, fetch all registered states before shutting down
  697. #assert debug("Fetching all requested states")
  698. if len(self.callbacks) > 0:
  699. for variable, model_id in self.callbacks:
  700. # Ask the controller on which node this model currently runs, calls to the controller
  701. # are very fast, as this runs locally. Otherwise a complete location dictionary would
  702. # have to be pickled and unpickled, but also filtered for local models, begin O(n) instead
  703. # of the current O(1), at the cost of more function calls
  704. location = self.controller.getLocation(model_id)
  705. proxy = self.controller.getProxy(location)
  706. state = proxy.getState(model_id)
  707. #assert debug("Setting state for " + str(variable))
  708. setattr(self, variable, state)
  709. if self.fetch_all:
  710. #assert info("Downloading model from locations")
  711. # We must download the state from each and every model
  712. for model in self.model.component_set:
  713. location = self.controller.getLocation(model.model_id)
  714. proxy = self.controller.getProxy(location)
  715. model.state = proxy.getState(model.model_id)
  716. # Shut down every location
  717. #assert debug("Shutting down servers")
  718. for loc in locations:
  719. proxy = self.controller.getProxy(loc)
  720. # If this is oneway, we will stop the simulation even before the finish was called everywhere
  721. proxy.finish()
  722. self.progress_finished = True
  723. # A progress bar was running without checkpointing: set the event to finish it
  724. if self.progress and self.checkpoint_interval <= 0:
  725. self.progress_event.set()
  726. # Activity tracking is enabled, so visualize it in whatever way was configured
  727. if self.activity_visualisation:
  728. visualizeActivity(self)
  729. # Check if the model was to be visualized
  730. if self.draw_model:
  731. # Possibly include event count visualisation
  732. #colors = self.controller.runAllocator()
  733. colors = self.controller.getEventGraph()
  734. #assert info("Drawing model distribution")
  735. out = open(self.draw_model_file, 'a')
  736. self.drawModelConnections(out, self.model, colors)
  737. out.write("}")
  738. global was_main
  739. if was_main:
  740. global nested
  741. nested = False