basesimulator.py 62 KB


  1. # Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
  2. # McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """
  16. Actual simulation kernel
  17. """
  18. from pypdevs.solver import Solver
  19. from pypdevs.util import *
  20. from pypdevs.messageScheduler import MessageScheduler
  21. from pypdevs.message import NetworkMessage
  22. from pypdevs.DEVS import RootDEVS, CoupledDEVS, AtomicDEVS
  23. from pypdevs.statesavers import *
  24. import threading
  25. from pypdevs.logger import *
  26. try:
  27. import cPickle as pickle
  28. except ImportError:
  29. import pickle
  30. from pypdevs.tracer import Tracers
  31. from pypdevs.activityVisualisation import *
  32. from collections import defaultdict
  33. import pypdevs.accurate_time as time
  34. try:
  35. import Queue
  36. except ImportError:
  37. import queue as Queue
  38. from collections import deque
  39. class BaseSimulator(Solver):
  40. """
  41. The BaseSimulator class, this is the actual simulation kernel.
  42. """
  43. def __init__(self, name, model, server):
  44. """
  45. Constructor
  46. :param name: the name of the kernel
  47. :param model: the model to initialise the kernel with
  48. """
  49. Solver.__init__(self)
  50. self.inits()
  51. self.server = server
  52. self.finish_sent = False
  53. self.reverts = 0
  54. self.transitioning = defaultdict(int)
  55. self.model = model
  56. self.tracers = Tracers()
  57. self.irreversible = False
  58. self.temporary_irreversible = False
  59. self.send_msg_counter = 0
  60. self.checkpoint_restored = False
  61. self.name = name
  62. self.realtime = False
  63. self.reset = False
  64. self.use_DSDEVS = False
  65. self.activity_tracking = False
  66. self.memoization = False
  67. self.total_activities = defaultdict(float)
  68. self.msg_sent = 0
  69. self.msg_recv = 0
  70. self.simlock_request = False
  71. def resetSimulation(self, scheduler):
  72. """
  73. Resets the simulation kernel to the saved version; can only be invoked after a previous simulation run.
  74. :param scheduler: the scheduler to set
  75. """
  76. model, model_ids = self.model, self.model_ids
  77. self.inits()
  78. self.finish_sent = False
  79. self.reverts = 0
  80. self.transitioning = defaultdict(int)
  81. self.tracers = Tracers()
  82. self.irreversible = False
  83. self.temporary_irreversible = False
  84. self.send_msg_counter = 0
  85. self.checkpoint_restored = False
  86. self.realtime = False
  87. self.reset = True
  88. proxy = self.getProxy(self.name)
  89. proxy.saveAndProcessModel(self.pickled_model, scheduler)
  90. def __setstate__(self, retdict):
  91. """
  92. For pickling
  93. :param retdict: dictionary containing the attributes to set
  94. """
  95. self.inits()
  96. for i in retdict:
  97. setattr(self, i, retdict[i])
  98. def __getstate__(self):
  99. """
  100. For pickling
  101. :returns: dictionary containing attributes and their values
  102. """
  103. retdict = {}
  104. unpicklable = frozenset(["instancemethod",
  105. "lock",
  106. "_Event",
  107. "Thread",
  108. "method-wrapper",
  109. "builtin_function_or_method"])
  110. unnecessary = frozenset(["input_scheduler",
  111. "inqueue",
  112. "actions",
  113. "server",
  114. "msg_sent",
  115. "msg_recv",
  116. "send_msg_counter",
  117. "output_queue",
  118. "accumulator",
  119. "control_msg",
  120. "transitioning",
  121. "Vchange",
  122. "V"])
  123. for i in dir(self):
  124. if getattr(self, i).__class__.__name__ in unpicklable:
  125. # unpicklable, so don't copy it
  126. continue
  127. elif str(i) == "tracers":
  128. retdict["tracers"] = self.tracers.tracers_init
  129. elif (str(i) not in unnecessary) and (not str(i).startswith("__")):
  130. retdict[str(i)] = getattr(self, i)
  131. return retdict
  132. def inits(self):
  133. """
  134. Initialise the simulation kernel, this is split up from the constructor to
  135. make it possible to reset the kernel without reconstructing the kernel.
  136. """
  137. self.model = None
  138. self.model_ids = []
  139. self.actions = []
  140. self.waiting = 0
  141. self.actionlock = threading.Lock()
  142. self.should_run = threading.Event()
  143. self.termination_time = (float('inf'), 1)
  144. self.termination_time_check = True
  145. self.termination_condition = None
  146. self.block_outgoing = None
  147. self.priorcount = 0
  148. self.priorlock = threading.Lock()
  149. self.priorevent = threading.Event()
  150. self.priorevent.set()
  151. self.relocation_pending = False
  152. self.prevtime_finished = False
  153. # Mattern's GVT algorithm
  154. # 0 = white1
  155. # 1 = red1
  156. # 2 = white2
  157. # 3 = red2
  158. self.color = 0
  159. self.V = [{}, {}, {}, {}]
  160. self.Tmin = float('inf')
  161. self.control_msg = None
  162. self.gvt = -float('inf')
  163. self.accumulator = defaultdict(int)
  164. self.send_msg_counter = 0
  165. self.prevtime = (0, 0)
  166. self.clock = (-float('inf'), 0)
  167. self.input_scheduler = MessageScheduler()
  168. self.output_queue = []
  169. self.simlock = threading.Lock()
  170. self.Vlock = threading.Lock()
  171. # Acquire the lock ASAP, to prevent simulation during/after shutdown
  172. # it has to be released as soon as the simulation is commenced
  173. self.simlock.acquire()
  174. self.wait_for_gvt = threading.Event()
  175. self.Vchange = [threading.Event() for _ in range(4)]
  176. self.sim_finish = threading.Event()
  177. self.finished = False
  178. self.inqueue = deque()
  179. def getProxy(self, rank):
  180. """
  181. Get a proxy to the specified rank.
  182. Method will simply forward the request to its server object.
  183. :param rank: the rank to return a proxy to
  184. """
  185. return self.server.getProxy(rank)
  186. def getSelfProxy(self):
  187. """
  188. Get a proxy to ourself.
  189. This method is useful in case the underlying code has no idea on which node it is running and it simply wants to contact its kernel.
  190. It also differs from simply calling the code on the object of the kernel, as this provides a wrapper for asynchronous local invocation.
  191. """
  192. return self.server.getProxy(self.name)
  193. def setTerminationTime(self, time):
  194. """
  195. Sets the time at which simulation should stop, setting this will override
  196. the local simulation condition.
  197. :param time: the time at which the simulation should stop
  198. """
  199. self.termination_time = time
  200. # Set it in case the kernel was already stopped and an invalidation happened
  201. self.should_run.set()
  202. def sendModel(self, model, model_ids, scheduler_type, flattened):
  203. """
  204. Send a model to this simulation kernel, as this one will be simulated
  205. :param model: the model to set
  206. :param model_ids: list containing all models in order of their model_ids
  207. :param scheduler_type: string representation of the scheduler to use
  208. :param flattened: whether or not the model had its ports decoupled from the models to allow pickling
  209. """
  210. self.flattened = flattened
  211. if flattened:
  212. model.unflattenConnections()
  213. self.total_model = model
  214. self.model_ids = model_ids
  215. self.destinations = [None] * len(self.model_ids)
  216. counter = 0
  217. local = []
  218. remotes = False
  219. for atomic in self.model_ids:
  220. if atomic.location == self.name:
  221. # Model is simulated here
  222. self.destinations[counter] = atomic
  223. local.append(atomic)
  224. else:
  225. self.destinations[counter] = atomic.location
  226. remotes = True
  227. counter += 1
  228. self.local = local
  229. if isinstance(model, CoupledDEVS):
  230. self.model = RootDEVS(self.local, model.component_set, scheduler_type)
  231. elif isinstance(model, AtomicDEVS):
  232. self.model = RootDEVS(self.local, [model], scheduler_type)
  233. self.activities = {}
  234. def migrateTo(self, destination, model_ids):
  235. """
  236. Migrate all models to a new destination
  237. :param destination: destination of all models specified hereafter
  238. :param model_ids: iterable containing all models to migrate simultaneously
  239. """
  240. # Assumes that the simlock is already acquired
  241. # Make sure that the model that we are migrating is local here
  242. #assert info("Migrating " + str(model_ids) + " to " + str(destination))
  243. models = set()
  244. for model_id in model_ids:
  245. if isinstance(self.destinations[model_id], int):
  246. raise DEVSException(
  247. "Cannot migrate model that is not local to the source!")
  248. if not self.destinations[model_id].relocatable:
  249. raise DEVSException(
  250. "Model %s is fixed and is not allowed to be relocated"
  251. % self.destinations[model_id].getModelFullName())
  252. models.add(self.destinations[model_id])
  253. destination = int(destination)
  254. if destination == self.name:
  255. # Model is already there...
  256. return
  257. #assert info("Migration approved of %s from node %d to node %d" % (model_ids, self.name, destination))
  258. for model in models:
  259. # All models are gone here, so remove them from the scheduler
  260. self.model.scheduler.unschedule(model)
  261. for i in range(self.kernels):
  262. if i != destination and i != self.name:
  263. self.getProxy(i).notifyMigration(model_ids, destination)
  264. remote = self.getProxy(destination)
  265. # NOTE Due to the revertions, the outputQueue will be completely empty because:
  266. # - messages before the GVT are cleaned up due to fossil collection
  267. # - messages after the GVT are cleaned up due to the revertion that sends anti-messages
  268. # Furthermore, the state vector will be as small as possible to reduce the amount of data that has to be transferred
  269. # The inputqueue requires some small processing: all future incomming messages for the model that gets migrated
  270. # needs to be found. The processed messages list should be empty, with the following reason as the outputQueue.
  271. remote.messageTransfer(self.input_scheduler.extract(model_ids))
  272. bundled_models = [
  273. (m.model_id, (m.time_last, m.time_next, m.state)) for m in models]
  274. #TODO clean up this code to use the bundling somewhat more efficient
  275. remote.activateModels(bundled_models)
  276. for model in models:
  277. # No need to ask the new node whether or not there are specific nodes that also have to be informed
  278. #remote.activateModel(model.model_id, (model.time_last, model.time_next, model.state))
  279. # Delete our representation of the model
  280. model.state = None
  281. model.old_states = []
  282. del self.activities[model.model_id]
  283. # Remove the model from the component_set of the RootDEVS
  284. components = self.model.component_set
  285. self.model.component_set = [m for m in components if m not in models]
  286. for model_id in model_ids:
  287. self.model.local_model_ids.remove(model_id)
  288. self.destinations[model_id] = destination
  289. self.model_ids[model_id].location = destination
  290. # Now update the time_next and time_last values here
  291. self.model.setTimeNext()
  292. def notifyMigration(self, model_ids, destination):
  293. """
  294. Notify the migration of a model_id to a new destination
  295. :param model_ids: the model_ids that gets moved
  296. :param destination: the node location that now hosts the model_id
  297. """
  298. if destination == self.name:
  299. # No need to notify ourselves, simply here for safety as it shouldn't be called
  300. return
  301. for model_id in model_ids:
  302. self.destinations[model_id] = destination
  303. self.model_ids[model_id].location = destination
  304. def requestMigrationLock(self):
  305. """
  306. Request this kernel to lock itself ASAP to allow a relocation to happen. This will invoke the *notifyLocked* method on the controller as soon as locking succeeded.
  307. """
  308. with self.priorlock:
  309. self.priorcount += 1
  310. self.priorevent.clear()
  311. self.relocation_pending = True
  312. self.simlock.acquire()
  313. with self.Vlock:
  314. self.revert((self.gvt, 0))
  315. self.getProxy(0).notifyLocked(self.name)
  316. def migrationUnlock(self):
  317. """
  318. Unlocks the simulation lock remotely.
  319. .. warning:: do not use this function, unless you fully understand what you are doing!
  320. """
  321. with self.priorlock:
  322. self.priorcount -= 1
  323. if self.priorcount == 0:
  324. self.priorevent.set()
  325. self.prevtime_finished = False
  326. self.relocation_pending = False
  327. self.simlock.release()
  328. def activateModels(self, bundle):
  329. """
  330. Call the activateModel method on a list of arguments
  331. :param bundle: a bundle of messages to send, each entry should contain a tuple that can be expanded for the call
  332. """
  333. for model_id, current_state in bundle:
  334. self.activateModel(model_id, current_state)
  335. def activateModel(self, model_id, current_state):
  336. """
  337. Activate the model at this kernel, thus allowing the kernel to use (and schedule) this model.
  338. Note that a revert to the GVT has to happen before calling this function, since the old_states
  339. are not transferred and thus reverting is impossible.
  340. :param model_id: the id of the model that has to be activated
  341. :param currentState: the current state of the model that gets migrated
  342. """
  343. new_model = self.model_ids[model_id]
  344. old_location = new_model.location
  345. new_model.location = self.name
  346. self.model.component_set.append(new_model)
  347. self.model.local_model_ids.add(new_model.model_id)
  348. new_model.time_last = current_state[0]
  349. new_model.time_next = current_state[1]
  350. new_model.state = current_state[2]
  351. new_model.old_states = [self.state_saver(new_model.time_last,
  352. new_model.time_next,
  353. new_model.state,
  354. 0.0,
  355. {},
  356. 0.0)]
  357. # It is a new model, so add it to the scheduler too
  358. self.model.scheduler.schedule(new_model)
  359. self.destinations[model_id] = new_model
  360. self.model.setTimeNext()
  361. self.activities[model_id] = 0.0
  362. def messageTransfer(self, extraction):
  363. """
  364. Transfer the messages during a model transfer
  365. :param extraction: the extraction generated by the *messageScheduler*
  366. """
  367. self.input_scheduler.insert(extraction, self.model_ids)
  368. def notifySend(self, destination, timestamp, color):
  369. """
  370. Notify the simulation kernel of the sending of a message. Needed for
  371. GVT calculation.
  372. :param destination: the name of the simulation kernel that will receive the sent message
  373. :param timestamp: simulation time at which the message is sent
  374. :param color: color of the message being sent (for Mattern's algorithm)
  375. """
  376. self.msg_sent += 1
  377. self.V[color][destination] = self.V[color].get(destination, 0) + 1
  378. if color == 1 or color == 3:
  379. self.Tmin = min(self.Tmin, timestamp)
  380. def notifyReceive(self, color):
  381. """
  382. Notify the simulation kernel of the receiving of a message. Needed for
  383. GVT calculation.
  384. :param color: the color of the received message (for Mattern's algorithm)
  385. """
  386. #assert debug("Received message with color: " + str(color))
  387. self.msg_recv += 1
  388. self.V[color][self.name] = self.V[color].get(self.name, 0) - 1
  389. self.Vchange[color].set()
  390. def waitUntilOK(self, vector):
  391. """
  392. Returns as soon as all messages to this simulation kernel are received.
  393. Needed due to Mattern's algorithm. Uses events to prevent busy looping.
  394. :param vector: the vector number to wait for. Should be 0 for colors 0 and 1, should be 1 for colors 2 and 3.
  395. """
  396. while not (self.V[vector].get(self.name, 0) +
  397. self.control_msg[2].get(self.name, 0) <= 0):
  398. self.Vlock.release()
  399. # Use an event to prevent busy looping
  400. self.Vchange[vector].wait()
  401. self.Vchange[vector].clear()
  402. # Free the lock
  403. self.Vlock.acquire()
  404. return False
  405. def receiveControl(self, msg, first=False):
  406. """
  407. Receive a GVT control message and process it. Method will block until the GVT is actually found, so make this an asynchronous call, or run it on a seperate thread.
  408. This code implements Mattern's algorithm with a slight modification: it uses 4 different colours to distinguish two subsequent runs. Furthermore, it always requires 2 complete passes before a GVT is found.
  409. """
  410. self.control_msg = msg
  411. m_clock = self.control_msg[0]
  412. m_send = self.control_msg[1]
  413. waiting_vector = self.control_msg[2]
  414. accumulating_vector = self.control_msg[3]
  415. with self.Vlock:
  416. prevcolor = 3 if self.color == 0 else self.color - 1
  417. color = self.color
  418. finished = (self.name == 0 and
  419. not first and
  420. (color == 0 or color == 2))
  421. if self.name == 0 and not first:
  422. if not allZeroDict(waiting_vector):
  423. raise DEVSException("GVT bug detected")
  424. waiting_vector = accumulating_vector
  425. self.control_msg[2] = accumulating_vector
  426. accumulating_vector = {}
  427. if finished:
  428. from math import floor
  429. gvt = floor(min(m_clock, m_send))
  430. print("Got GVT")
  431. if gvt < self.gvt:
  432. raise DEVSException("GVT is decreasing")
  433. self.accumulator = waiting_vector
  434. use_last_state = self.relocator.useLastStateOnly()
  435. self.getProxy(self.name).setGVT(gvt,
  436. [],
  437. use_last_state)
  438. return
  439. else:
  440. self.waitUntilOK(prevcolor)
  441. addDict(waiting_vector, self.V[prevcolor])
  442. addDict(accumulating_vector, self.V[color])
  443. self.V[prevcolor] = {}
  444. self.V[color] = {}
  445. if not self.prevtime_finished:
  446. localtime = self.prevtime[0]
  447. else:
  448. localtime = float('inf')
  449. ntime = localtime if self.name == 0 else min(m_clock, localtime)
  450. msg = [ntime,
  451. min(m_send, self.Tmin),
  452. waiting_vector,
  453. accumulating_vector]
  454. self.Tmin = float('inf')
  455. self.color = (self.color + 1) % 4
  456. self.next_LP.receiveControl(msg)
  457. def setIrreversible(self):
  458. """
  459. Mark this node as **temporary** irreversible, meaning that it can simply be made reversible later on.
  460. This can be used when all nodes are ran at a single node due to relocation, though future relocations might again move some nodes away.
  461. """
  462. self.temporary_irreversible = True
  463. def unsetIrreversible(self):
  464. """
  465. Unmark this node as **temporary** irreversible.
  466. """
  467. self.temporary_irreversible = False
  468. def setGVT(self, gvt, activities, last_state_only):
  469. """
  470. Sets the GVT of this simulation kernel. This value should not be smaller than
  471. the current GVT (this would be impossible for a correct GVT calculation). Also
  472. cleans up the input, output and state buffers used due to time-warp.
  473. Furthermore, it also processes all messages scheduled before the GVT.
  474. :param gvt: the desired GVT
  475. :param activities: the activities of all seperate nodes as a list
  476. :param last_state_only: whether or not all states should be considered or only the last
  477. """
  478. # GVT is just a time, it does not contain an age field!
  479. #assert debug("Got setGVT")
  480. if gvt < self.gvt:
  481. raise DEVSException("GVT cannot decrease from %s to %s!"
  482. % (self.gvt, gvt))
  483. if gvt == self.gvt:
  484. # The same, so don't do the batched fossil collection
  485. # This will ALWAYS happen at the controller first, as this is the one that gets called with the GVT update first
  486. # if the value should change, it will do a complete round and finally set the variable
  487. # if the value stays the same, we can stop immediately
  488. #assert info("Set GVT to %s" % GVT)
  489. if self.initial_allocator is not None:
  490. if gvt >= self.initial_allocator.getTerminationTime():
  491. # The initial allocator period is over, so switch to normal simulation
  492. relocs = self.getInitialAllocations()
  493. # Possibly, the locations are altered, so reset everything
  494. for model in self.model.component_set:
  495. model.location = 0
  496. # Function pointers, so CamelCase
  497. self.atomicOutputGeneration = self.atomicOutputGeneration_backup
  498. self.performRelocationsInit(relocs)
  499. # Clear activities for now, as we don't want activity relocation medling in our affairs
  500. activities = []
  501. if activities:
  502. if self.old_gvt == -float('inf'):
  503. self.old_gvt = 0.
  504. horizon = self.gvt - self.old_gvt
  505. if self.gvt != self.old_gvt and activities[0][1] is not None:
  506. f = open("activity-log", 'a')
  507. f.write(str((self.gvt - self.old_gvt) / 2 + self.old_gvt))
  508. for _, a in activities:
  509. f.write(" %s" % (a / horizon))
  510. f.write("\n")
  511. f.close()
  512. self.findAndPerformRelocations(gvt, activities, horizon)
  513. # Otherwise: there was no pass in the GVT ring, indicating that no GVT progress was made
  514. # This also indicates that the activities will NOT be reset
  515. self.GVTdone()
  516. return
  517. self.simlock_request = True
  518. with self.simlock:
  519. self.simlock_request = False
  520. #assert debug("Set GVT to " + str(GVT))
  521. self.old_gvt = self.gvt
  522. self.gvt = gvt
  523. nqueue = []
  524. self.input_scheduler.cleanup((gvt, 1))
  525. self.performActions(gvt)
  526. found = False
  527. for index in range(len(self.output_queue)):
  528. if self.output_queue[index].timestamp[0] >= gvt:
  529. found = True
  530. self.output_queue = self.output_queue[index:]
  531. break
  532. if not found:
  533. self.output_queue = []
  534. self.activities = {}
  535. self.model.setGVT(gvt, self.activities, last_state_only)
  536. addDict(self.total_activities, self.activities)
  537. if self.temporary_irreversible:
  538. #print("Setting new state for %s models" % len(self.model.component_set))
  539. for model in self.model.component_set:
  540. activity = self.total_activities[model.model_id]
  541. model.old_states = [self.state_saver(model.time_last,
  542. model.time_next,
  543. model.state,
  544. activity,
  545. None,
  546. None)]
  547. #TODO this is commented...
  548. #self.totalActivities = defaultdict(float)
  549. # Make a checkpoint too
  550. if self.checkpoint_counter == self.checkpoint_freq:
  551. self.checkpoint()
  552. self.checkpoint_counter = 0
  553. else:
  554. self.checkpoint_counter += 1
  555. # Move the pending activities
  556. if last_state_only:
  557. activity_sum = None
  558. else:
  559. activity_sum = sum(self.activities.values())
  560. activities.append((self.name, activity_sum))
  561. self.next_LP.setGVT(gvt, activities, last_state_only)
  562. def revert(self, time):
  563. """
  564. Revert the current simulation kernel to the specified time. All messages
  565. sent after this time will be invalidated, all states produced after this
  566. time will be removed.
  567. :param time: the desired time for revertion.
  568. .. note:: Clearly, this time should be >= the current GVT
  569. """
  570. # Don't #assert that it is not irreversible, as an irreversible component could theoretically still be reverted, but this MUST be to a state when it was not yet irreversible
  571. # Reverting the complete LP
  572. if time[0] < self.gvt:
  573. raise DEVSException("Reverting to time %f, before the GVT (%f)!"
  574. % (time[0], self.gvt))
  575. #assert debug("Removing actions from time " + str(time))
  576. self.transitioning = defaultdict(int)
  577. self.reverts += 1
  578. #assert debug("Revert to time " + str(time) + ", clock = " + str(self.clock))
  579. if self.do_some_tracing:
  580. self.getProxy(0).removeActions(self.model.local_model_ids, time)
  581. # Also revert the input message scheduler
  582. self.input_scheduler.revert(time)
  583. # Now revert all local models
  584. controller_revert = self.model.revert(time, self.memoization)
  585. #assert debug("Reverted all models")
  586. self.clock = self.prevtime = time
  587. # Invalidate all output messages after or at time
  588. end = -1
  589. unschedules = {}
  590. unschedules_mintime = {}
  591. for index, value in enumerate(self.output_queue):
  592. # Do not invalidate messages at this time itself, as they are processed in this time step and not generated in this timestep
  593. if value.timestamp > time:
  594. model_id = value.destination
  595. mintime = unschedules_mintime.get(model_id,
  596. (float('inf'), 0))
  597. unschedules_mintime[model_id] = min(mintime, value.timestamp)
  598. unschedules.setdefault(model_id, []).append(value.uuid)
  599. else:
  600. #assert debug("NOT invalidating " + str(value.uuid))
  601. end = index
  602. self.output_queue = self.output_queue[:end+1]
  603. try:
  604. self.block_outgoing = self.output_queue[-1].timestamp
  605. except IndexError:
  606. self.block_outgoing = None
  607. # Don't need the Vlock here, as we already have it
  608. for model_id in unschedules:
  609. dest_kernel = self.destinations[model_id]
  610. if not isinstance(dest_kernel, int):
  611. raise DEVSException("Revertion due to relocation to self... This is impossible!")
  612. mintime = unschedules_mintime[model_id]
  613. # Assume we have the simlock already
  614. self.notifySend(dest_kernel, mintime[0], self.color)
  615. self.getProxy(dest_kernel).receiveAntiMessages(mintime,
  616. model_id,
  617. unschedules[model_id],
  618. self.color)
  619. # Controller has read one of the reverted states, so force a rollback there
  620. if controller_revert:
  621. self.notifySend(0, time[0], self.color)
  622. self.getProxy(0).receiveAntiMessages(time, None, [], self.color)
  623. self.should_run.set()
  624. def send(self, model_id, timestamp, content):
  625. """
  626. Prepare a message to be sent remotely and do the actual sending too.
  627. :param model_id: the id of the model that has to receive the message
  628. :param timestamp: timestamp of the message
  629. :param content: content of the message being sent
  630. """
  631. if self.block_outgoing == timestamp and (not self.checkpoint_restored):
  632. # If the model was just reverted, we don't need to sent out these
  633. # messages because they are already in the receivers queues.
  634. #assert debug("Not sending message " + str(timestamp))
  635. return
  636. self.checkpoint_restored = False
  637. remote_location = self.destinations[model_id]
  638. # NOTE the Vlock is already acquired by the sender
  639. msg = NetworkMessage(timestamp,
  640. content,
  641. self.genUUID(),
  642. self.color,
  643. model_id)
  644. # Assume we have the simlock
  645. self.notifySend(remote_location, msg.timestamp[0], msg.color)
  646. # The message should be saved, though it should not be a copy. This is because the middleware will make
  647. # a copy itself, making this old message unused. Furthermore, the receiver will always create a copy
  648. # of the message to be safe, making a copy at the source unnecessary
  649. self.output_queue.append(msg)
  650. self.getProxy(remote_location).receive(msg)
  651. def receive(self, msg):
  652. """
  653. Make the kernel receive the provided message.
  654. The method will return as soon as possible to prevent a big number of pending messages.
  655. Furthermore, acquiring the locks here would be impractical since we only process all incomming messages one at a time.
  656. :param msg: a NetworkMessage to process
  657. """
  658. # NOTE ports could change at run-time, though this is not a problem in distributed simulation!
  659. # NOTE no need for locking, as all methods of a deque object is atomic
  660. self.inqueue.append(msg)
  661. self.should_run.set()
  662. def processIncommingMessages(self):
  663. """
  664. Process all incomming messages and return.
  665. This is part of the main simulation loop instead of being part of the message receive method, as we require the simlock for this. Acquiring the simlock elsewhere might take some time!
  666. """
  667. while self.inqueue:
  668. msg = self.inqueue.popleft()
  669. dest_model = msg.destination
  670. if dest_model not in self.model.local_model_ids:
  671. # NOTE do it this way to make sure that anti message properties are conserved
  672. # furthermore, it prevents the message from being invalidated
  673. self.notifyReceive(msg.color)
  674. dest = self.destinations[dest_model]
  675. msg.color = self.color
  676. # No need to reencode the data, as it was still encoded
  677. self.notifySend(dest, msg.timestamp[0], self.color)
  678. self.getProxy(dest).receive(msg)
  679. continue
  680. #assert debug("Processing external msg: " + str(msg))
  681. model = self.model_ids[dest_model]
  682. msg.content = {model.ports[e]: msg.content[e] for e in msg.content}
  683. if msg.timestamp <= self.prevtime:
  684. # Timestamp is before the prevtime
  685. # so set the prevtime back in the past
  686. self.revert(msg.timestamp)
  687. elif self.prevtime_finished:
  688. # The prevtime is irrelevant, as we have finished simulation
  689. self.prevtime = msg.timestamp
  690. self.prevtime_finished = False
  691. self.notifyReceive(msg.color)
  692. # Now the message is an 'ordinary' message, just schedule it for processing
  693. self.input_scheduler.schedule(msg)
  694. self.model.time_next = min(self.model.time_next, msg.timestamp)
  695. def receiveAntiMessages(self, mintime, model_id, uuids, color):
  696. """
  697. Process a (possibly huge) batch of anti messages for the same model
  698. :param mintime: the lowest timestamp of all messages being cancelled
  699. :param model_id: the model_id of the receiving model whose messages need to be negated, None to indicate a general rollback
  700. :param uuids: list of all uuids to cancel
  701. :param color: color for Mattern's algorithm
  702. .. note:: the *model_id* is only required to check whether or not the model is still local to us
  703. """
  704. # Important that this function is called oneway, as it can be called in such a way that it deadlocks otherwise
  705. with self.priorlock:
  706. self.priorcount += 1
  707. self.priorevent.clear()
  708. try:
  709. with self.simlock:
  710. with self.Vlock:
  711. if (model_id not in self.model.local_model_ids and
  712. model_id is not None):
  713. self.notifyReceive(color)
  714. destination = self.destinations[model_id]
  715. self.getProxy(destination).receiveAntiMessages(mintime,
  716. model_id,
  717. uuids,
  718. self.color)
  719. self.notifySend(destination, mintime[0], self.color)
  720. return
  721. if mintime <= self.prevtime:
  722. # Timestamp is before the prevtime
  723. # so set the prevtime back in the past
  724. self.revert(mintime)
  725. elif self.prevtime_finished:
  726. # The prevtime is irrelevant, as we have finished simulation
  727. self.prevtime = mintime
  728. self.prevtime_finished = False
  729. self.notifyReceive(color)
  730. if model_id is not None:
  731. self.input_scheduler.massUnschedule(uuids)
  732. finally:
  733. with self.priorlock:
  734. self.priorcount -= 1
  735. if self.priorcount == 0:
  736. self.priorevent.set()
  737. def check(self):
  738. """
  739. Checks wheter or not simulation should still continue. This will either
  740. call the global time termination check, or the local state termination
  741. check, depending on configuration.
  742. Using the global time termination check is a lot FASTER and should be used
  743. if possible.
  744. :returns: bool -- whether or not to stop simulation
  745. """
  746. # Return True = stop simulation
  747. # Return False = continue simulation
  748. if self.prevtime[1] > 1000:
  749. # Max loop checks
  750. raise DEVSException("Maximal number of 0 timeAdvance loops detected")
  751. if self.termination_time_check:
  752. # Finish at the termination time
  753. if self.model.time_next > self.termination_time:
  754. try:
  755. timestamp = self.input_scheduler.readFirst().timestamp
  756. return timestamp > self.termination_time
  757. except IndexError:
  758. # No message waiting to be processed, so finished
  759. return True
  760. else:
  761. return False
  762. else:
  763. # Use a termination condition
  764. # This code is only ran at the controller, as this is the only one with a termination condition
  765. if (self.prevtime[0] == float('inf') or
  766. self.termination_condition(self.prevtime, self.total_model)):
  767. if not self.finish_sent:
  768. self.finishAtTime((self.prevtime[0], self.prevtime[1] + 1))
  769. self.finish_sent = True
  770. return True
  771. elif self.finish_sent:
  772. self.finishAtTime((float('inf'), float('inf')))
  773. self.finish_sent = False
  774. return False
  775. def finishAtTime(self, clock):
  776. """
  777. Signal this kernel that it may stop at the provided time
  778. :param clock: the time to stop at
  779. """
  780. for num in range(self.kernels):
  781. self.getProxy(num).setTerminationTime(clock)
  782. def massDelayedActions(self, time, msgs):
  783. """
  784. Call the delayedAction function multiple times in succession.
  785. Mainly implemented to reduce the number of round trips when tracing.
  786. :param time: the time at which the action should happen
  787. :param msgs: list containing elements of the form (model_id, action)
  788. """
  789. for model_id, action in msgs:
  790. self.delayedAction(time, model_id, action)
  791. def delayedAction(self, time, model_id, action):
  792. """
  793. Perform an irreversible action (I/O, prints, global messages, ...). All
  794. these actions will be performed in the order they should be generated
  795. in a non-distributed simulation. All these messages might be reverted in
  796. case a revertion is performed by the calling model.
  797. :param time: the simulation time at which this command was requested
  798. :param model_id: the model_id of the model that requested this command
  799. :param action: the actual command to be executed as soon as it is safe
  800. """
  801. #assert debug("Adding action for time " + str(time) + ", GVT = " + str(self.GVT))
  802. if time[0] < self.gvt:
  803. raise DEVSException("Can't execute action (%s) before the GVT (%s)!"
  804. % (time, self.gvt))
  805. if (self.irreversible and
  806. len(self.actions) > 0 and
  807. time > self.actions[-1][0]):
  808. self.performActions()
  809. # An append is an atomic action, though we need to lock it as other operations on it arent' atomic
  810. with self.actionlock:
  811. self.actions.append([time, model_id, action])
  812. def removeActions(self, model_ids, time):
  813. """
  814. Remove all actions specified by a model, starting from a specified time.
  815. This function should be called when the model is reverted and its actions
  816. have to be undone
  817. :param model_ids: the model_ids of all reverted models
  818. :param time: time up to which to remove all actions
  819. """
  820. if time[0] < self.gvt:
  821. raise DEVSException("Cannot remove action (%s) before the GVT (%s)!"
  822. % (time, self.gvt))
  823. #assert debug("Removing actions for time " + str(time) + " and for ids " + str(model_ids))
  824. # Actions are unsorted, so we have to go through the complete list
  825. with self.actionlock:
  826. self.actions = [i for i in self.actions
  827. if not ((i[1] in model_ids) and (i[0] >= time))]
  828. def performActions(self, gvt = float('inf')):
  829. """
  830. Perform all irreversible actions up to the provided time.
  831. If time is not specified, all queued actions will be executed (in case simulation is finished)
  832. :param gvt: the time up to which all actions should be executed
  833. """
  834. if (gvt >= self.termination_time[0] and
  835. self.termination_condition is not None):
  836. # But crop of to the termination_time as we might have simulated slightly too long
  837. gvt = self.termination_time[0] + EPSILON
  838. with self.actionlock:
  839. if gvt != float('inf'):
  840. # Only take the relevant part to sort, this will decrease complexity
  841. lst = []
  842. remainder = []
  843. for i in self.actions:
  844. if i[0][0] < gvt:
  845. lst.append(i)
  846. else:
  847. remainder.append(i)
  848. else:
  849. lst = self.actions
  850. remainder = []
  851. self.actions = remainder
  852. # Release the lock ASAP, to allow other actions to be performed
  853. # Sort on time first, then on MESSAGE, not on model
  854. lst.sort(key=lambda i: [i[0], i[2]])
  855. # Now execute each action in order
  856. for i in lst:
  857. exec(i[2])
  858. def removeTracers(self):
  859. """
  860. Removes all currently registered tracers. This does not clean them up, as this should already be done by the code at the end of the simulation.
  861. """
  862. self.tracers = Tracers()
  863. def setGlobals(self,
  864. address,
  865. loglevel,
  866. checkpoint_frequency,
  867. checkpoint_name,
  868. statesaver,
  869. kernels,
  870. msg_copy,
  871. memoization,
  872. tracers):
  873. """
  874. Configure all 'global' variables for this kernel
  875. :param address: address of the syslog server
  876. :param loglevel: level of logging library
  877. :param checkpoint_frequency: frequency at which checkpoints should be made
  878. :param checkpoint_name: name of the checkpoint to save
  879. :param statesaver: statesaving method
  880. :param kernels: number of simulation kernels in total
  881. :param msg_copy: message copy method
  882. :param memoization: use memoization or not
  883. """
  884. for tracer in tracers:
  885. self.tracers.registerTracer(tracer, self.server, self.checkpoint_restored)
  886. self.do_some_tracing = self.tracers.hasTracers()
  887. self.address = address
  888. self.loglevel = loglevel
  889. self.kernels = kernels
  890. self.next_LP = self.getProxy((self.name + 1) % kernels)
  891. self.irreversible = self.kernels == 1
  892. self.temporary_irreversible = self.irreversible
  893. state_saving_options = {0: DeepCopyState,
  894. 1: PickleZeroState,
  895. 2: PickleHighestState,
  896. 3: CopyState,
  897. 4: AssignState,
  898. 5: CustomState,
  899. 6: MarshalState}
  900. self.state_saver = state_saving_options[statesaver]
  901. # Save the integer value for checkpointing
  902. self.state_saving = statesaver
  903. self.msg_copy = msg_copy
  904. setLogger(self.name, address, loglevel)
  905. self.checkpoint_name = checkpoint_name
  906. self.checkpoint_freq = checkpoint_frequency
  907. self.checkpoint_counter = 0
  908. self.memoization = memoization
  909. def processMessage(self, clock):
  910. """
  911. Find the first external message smaller than the clock and process them if necessary. Return the new time_next for simulation.
  912. :param clock: timestamp of the next internal transition
  913. :returns: timestamp of the next transition, taking into account external messages
  914. """
  915. try:
  916. message = self.input_scheduler.readFirst()
  917. except IndexError:
  918. # No input messages
  919. return clock
  920. if message.timestamp < clock:
  921. # The message is sent before the timenext, so update the clock
  922. clock = message.timestamp
  923. try:
  924. while (abs(clock[0] - message.timestamp[0]) < EPSILON and
  925. (clock[1] == message.timestamp[1])):
  926. for port in message.content:
  927. aDEVS = port.host_DEVS
  928. content = message.content[port]
  929. aDEVS.my_input.setdefault(port, []).extend(content)
  930. self.transitioning[aDEVS] |= 2
  931. self.input_scheduler.removeFirst()
  932. message = self.input_scheduler.readFirst()
  933. except IndexError:
  934. # At the end of the scheduler, so we are done
  935. pass
  936. return clock
  937. def realtimeWait(self):
  938. """
  939. Perform the waiting for input required in realtime simulation.
  940. The time_next of the model will be updated accordingly and all messages will be routed.
  941. """
  942. # NOTE a scale of 2 means that simulation will take twice as long
  943. self.performActions()
  944. # Wait for the determined period of time
  945. current_realtime_time = (time.time() - self.rt_zerotime)
  946. scaled_realtime_time = current_realtime_time / self.realtime_scale
  947. self.asynchronous_generator.checkInterrupt(scaled_realtime_time)
  948. next_sim_time = min(self.model.time_next[0],
  949. self.termination_time[0],
  950. self.asynchronous_generator.getNextTime())
  951. # Scaled realtime
  952. next_sim_time *= self.realtime_scale
  953. # Subtract the time that we already did our computation
  954. wait_time = next_sim_time - current_realtime_time
  955. if wait_time <= 0:
  956. # Do the transition directly
  957. self.realtime_counter -= 1
  958. if self.realtime_counter < 0:
  959. self.realtime_counter = 100
  960. self.threading_backend.wait(0.01, self.runsim)
  961. return True
  962. # Transition
  963. #NOTE actually, we should set the time_next here too...
  964. # otherwise we will always have a 'perfect' time_next
  965. return False
  966. else:
  967. interrupt = self.threading_backend.getInterrupt()
  968. if interrupt is None:
  969. self.realtime_counter = 100
  970. if wait_time == float('inf') and getattr(self, "accept_external_input", False):
  971. self.threading_backend.wait(0.01, self.runsim)
  972. else:
  973. self.threading_backend.wait(wait_time, self.runsim)
  974. return True
  975. try:
  976. info = interrupt.split(" ")
  977. portname = info[0]
  978. event_value = " ".join(info[1:])
  979. if event_value[0] in "([{": # also allow tuples, lists or dictionaries
  980. try:
  981. event_value = eval(event_value)
  982. except: pass
  983. # event_value = eval(" ".join(info[1:]))
  984. # portname, event_value = interrupt.split(" ")
  985. event_port = self.portmap[portname]
  986. except ValueError:
  987. # Couldn't split, means we should stop
  988. import sys
  989. sys.exit(0)
  990. # Process the input
  991. #NOTE no distinction between PDEVS and CDEVS is necessary, as CDEVS is internally handled just like PDEVS
  992. # wrappers are provided to 'unpack' the list structure
  993. if isinstance(event_port.host_DEVS, AtomicDEVS):
  994. msg = {event_port: [event_value]}
  995. event_port.host_DEVS.my_input = msg
  996. self.transitioning[event_port.host_DEVS] = 2
  997. time_diff = time.time() - self.rt_zerotime
  998. self.model.time_next = (time_diff / self.realtime_scale, 1)
  999. else:
  1000. # We are outputting on a Coupled DEVS model, so we just find out what its ports are connected to
  1001. time_diff = time.time() - self.rt_zerotime
  1002. tn = time_diff / self.realtime_scale
  1003. # TODO: SAM right now i think
  1004. for p in event_port.outline:
  1005. z = event_port.z_functions[p]
  1006. ev = event_value if z is None else z(event_value)
  1007. msg = {p: [ev]}
  1008. p.host_DEVS.my_input = msg
  1009. self.transitioning[p.host_DEVS] = 2
  1010. self.model.time_next = (tn, 1)
  1011. #for p, z in event_port.routing_outline:
  1012. # ev = event_value if z is None else z(event_value)
  1013. # msg = {p: [ev]}
  1014. # p.host_DEVS.my_input = msg
  1015. # self.transitioning[p.host_DEVS] = 2
  1016. # self.model.time_next = (tn, 1)
  1017. # Transition
  1018. return False
  1019. def runsim(self):
  1020. """
  1021. Run a complete simulation run. Can be run multiple times if this is required in e.g. a distributed simulation.
  1022. """
  1023. self.realtime_counter = 100
  1024. while 1:
  1025. if self.realtime and self.realtimeWait():
  1026. # Make implicit use of shortcut evaluation,
  1027. # The realtimeWait method will return True if it forces a simulation stop
  1028. break
  1029. if self.inqueue:
  1030. with self.simlock:
  1031. with self.Vlock:
  1032. self.processIncommingMessages()
  1033. self.priorevent.wait()
  1034. # Only do the check here, after setting the next time of the simulation
  1035. # All priority threads are cleared, so obtain the simulation lock ourself
  1036. while self.simlock_request:
  1037. time.sleep(0.00001)
  1038. with self.simlock:
  1039. if self.check():
  1040. self.prevtime_finished = True
  1041. break
  1042. # Process all incomming messages
  1043. if not self.irreversible:
  1044. # Check the external messages only if there is a possibility for them to arrive
  1045. # This is a slight optimisation for local simulation
  1046. tn = self.processMessage(self.model.time_next)
  1047. else:
  1048. tn = self.model.time_next
  1049. if tn[0] == float('inf'):
  1050. # Always break, even if the terminiation condition/time was wrong
  1051. self.transitioning = defaultdict(int)
  1052. self.prevtime_finished = True
  1053. break
  1054. cDEVS = self.model
  1055. # Round of the current clock time, which is necessary for revertions later on
  1056. self.current_clock = (round(tn[0], 6), tn[1])
  1057. # Don't interrupt the output generation, as these nodes WILL be marked as 'sent'
  1058. with self.Vlock:
  1059. reschedule = self.coupledOutputGeneration(self.current_clock)
  1060. try:
  1061. self.massAtomicTransitions(self.transitioning, self.current_clock)
  1062. cDEVS.scheduler.massReschedule(reschedule)
  1063. if self.use_DSDEVS:
  1064. # Check for dynamic structure simulation
  1065. self.performDSDEVS(self.transitioning)
  1066. except QuickStopException:
  1067. # For relocations that should interrupt the simulation algorithm
  1068. self.simlock.release()
  1069. self.priorevent.wait()
  1070. self.simlock.acquire()
  1071. continue
  1072. # Put this in a lock to prevent a possible infinite time_next from reading the prevtime with the old value
  1073. with self.Vlock:
  1074. # Fetch the next time of a transition
  1075. cDEVS.setTimeNext()
  1076. # Clear all transitioning elements
  1077. self.transitioning = defaultdict(int)
  1078. # No longer block any output messages
  1079. self.block_outgoing = None
  1080. # self.clock now contains the time at which NO messages were sent
  1081. self.prevtime = self.current_clock
  1082. self.clock = self.model.time_next
  1083. def finishRing(self, msg_sent, msg_recv, first_run=False):
  1084. """
  1085. Go over the ring and ask each kernel whether it is OK to stop simulation
  1086. or not. Uses a count to check that no messages are yet to be processed.
  1087. :param msg_sent: current counter for total amount of sent messages
  1088. :param msg_recv: current counter for total amount of received messages
  1089. :param first_run: whether or not to forward at the controller
  1090. :returns: int -- amount of messages received and sent (-1 signals running simulation)
  1091. """
  1092. #NOTE due to the MPI backend changing None to 0, we need to return something else, like a -1...
  1093. # Try to obtain the simulation lock first
  1094. if not self.simlock.acquire(False):
  1095. # It was already taken, so something is still working
  1096. return -1
  1097. try:
  1098. if self.should_run.isSet():
  1099. # We should still run
  1100. return -1
  1101. elif self.name == 0 and not first_run:
  1102. # We are done, so return if they are equal
  1103. if msg_sent == msg_recv:
  1104. return msg_sent
  1105. else:
  1106. # Some messages are not yet received, so not correct
  1107. return -1
  1108. finally:
  1109. # Always release the simlock when we got it
  1110. self.simlock.release()
  1111. # Ask the next node for its situation
  1112. return self.next_LP.finishRing(self.msg_sent + msg_sent,
  1113. self.msg_recv + msg_recv)
  1114. def checkpoint(self):
  1115. """
  1116. Save a checkpoint of the current basesimulator, this function will assume
  1117. that no messages are still left in the medium, since these are obviously
  1118. not saved by pickling the base simulator.
  1119. """
  1120. # pdc = PythonDevs Checkpoint
  1121. outfile = open("%s_%s_%s.pdc"
  1122. % (self.checkpoint_name, round(self.gvt, 2), self.name), 'w')
  1123. # If the model was flattened when it was sent to this node, we will also need to flatten it while checkpointing
  1124. if self.flattened:
  1125. self.model.flattenConnections()
  1126. pickle.dump(self, outfile)
  1127. if self.flattened:
  1128. # Don't forget to unflatten!
  1129. self.model.unflattenConnections()
  1130. def loadCheckpoint(self):
  1131. """
  1132. Alert this kernel that it is restoring from a checkpoint
  1133. """
  1134. # Overwrite variables for GVT algorithm
  1135. self.color = 0
  1136. self.transitioning = defaultdict(int)
  1137. self.V = [{}, {}, {}, {}]
  1138. self.Tmin = float('inf')
  1139. self.control_msg = None
  1140. self.waiting = 0
  1141. self.checkpoint_restored = True
  1142. tracerlist = self.tracers
  1143. self.tracers = Tracers()
  1144. self.setGlobals(address=self.address,
  1145. loglevel=self.loglevel,
  1146. tracers=tracerlist,
  1147. memoization=self.memoization,
  1148. checkpoint_name = self.checkpoint_name,
  1149. checkpoint_frequency=self.checkpoint_freq,
  1150. statesaver=self.state_saving,
  1151. kernels=self.kernels,
  1152. msg_copy=self.msg_copy)
  1153. # Still unflatten the model if it was flattened (due to pickling limit)
  1154. if self.flattened:
  1155. self.model.unflattenConnections()
  1156. self.msg_sent = 0
  1157. self.msg_recv = 0
  1158. self.priorcount = 0
  1159. self.priorevent = threading.Event()
  1160. self.priorevent.set()
  1161. self.priorlock = threading.Lock()
  1162. #self.inqueue = Queue.Queue()
  1163. self.inqueue = deque()
  1164. # Just perform a revertion
  1165. # but clear the queues first
  1166. self.output_queue = []
  1167. # and the inputQueue, since every model will be reset to GVT
  1168. # everything that happens before GVT can be cleared by revertion
  1169. # everything that happens after GVT will be replicated by the external models
  1170. # Useful, since this also allows us to skip saving all this info in the pickled data
  1171. self.input_scheduler = MessageScheduler()
  1172. self.actions = []
  1173. with self.Vlock:
  1174. self.revert((self.gvt, 0))
  1175. def simulate_sync(self):
  1176. """
  1177. A small wrapper around the simulate() function, though with a different name to allow a much simpler MPI one way check
  1178. """
  1179. self.simulate()
  1180. def simulate(self):
  1181. """
  1182. Simulate at this kernel
  1183. """
  1184. had_lock = not self.simlock.acquire(False)
  1185. if had_lock:
  1186. # We already had the lock, so normal simulation
  1187. # Send the init message
  1188. if self.gvt == -float('inf'):
  1189. # To make sure that the GVT algorithm won't start already and see that this
  1190. # model has nothing to simulate
  1191. self.model.time_next = (0, 0)
  1192. self.coupledInit()
  1193. else:
  1194. # We didn't have the lock yet, so this is a continueing simulation
  1195. pass
  1196. self.simlock.release()
  1197. controller = self.getProxy(0)
  1198. self.finished = False
  1199. if self.realtime:
  1200. self.runsim()
  1201. self.sim_finish.set()
  1202. return
  1203. while 1:
  1204. self.runsim()
  1205. if self.irreversible:
  1206. self.should_run.clear()
  1207. break
  1208. self.should_run.wait()
  1209. self.should_run.clear()
  1210. if self.finished:
  1211. break
  1212. self.sim_finish.set()
  1213. def setAttr(self, model_id, attr, value):
  1214. """
  1215. Sets an attribute of a model.
  1216. :param model_id: the id of the model to alter
  1217. :param attr: string representation of the attribute to alter
  1218. :param value: value to set
  1219. """
  1220. setattr(self.model_ids[model_id], attr, value)
  1221. def setStateAttr(self, model_id, attr, value):
  1222. """
  1223. Sets an attribute of the state of a model
  1224. :param model_id: the id of the model to alter
  1225. :param attr: string representation of the attribute to alter
  1226. :param value: value to set
  1227. """
  1228. setattr(self.model_ids[model_id].state, attr, value)
  1229. def startTracers(self):
  1230. """
  1231. Start all tracers
  1232. """
  1233. self.tracers.startTracers()
  1234. def stopTracers(self):
  1235. """
  1236. Stop all tracers
  1237. """
  1238. self.tracers.stopTracers()
  1239. def getGVT(self):
  1240. """
  1241. Return the GVT of this kernel
  1242. :returns: float -- the current GVT
  1243. """
  1244. return self.gvt
  1245. def getTime(self):
  1246. """
  1247. Return the current time of this kernel
  1248. :returns: float -- the current simulation time
  1249. """
  1250. return self.prevtime[0]
  1251. def getState(self, model_id):
  1252. """
  1253. Return the state of the specified model
  1254. :param model_id: the model_id of the model of which the state is requested
  1255. :returns: state -- the state of the requested model
  1256. """
  1257. return self.model_ids[model_id].state
  1258. def getStateAtTime(self, model_id, request_time):
  1259. """
  1260. Gets the state of a model at a specific time
  1261. :param model_id: model_id of which the state should be fetched
  1262. :param request_time: time of the state
  1263. """
  1264. return self.model_ids[model_id].getState(request_time, False)
  1265. def genUUID(self):
  1266. """
  1267. Create a unique enough ID for a message
  1268. :returns: string -- a unique string for the specific name and number of sent messages
  1269. """
  1270. self.send_msg_counter += 1
  1271. return "%s-%s" % (self.name, self.send_msg_counter)
  1272. def getLocation(self, model_id):
  1273. """
  1274. Returns the location at which the model with the provided model_id runs
  1275. :param model_id: the model_id of the model of which the location is requested
  1276. :returns: int -- the number of the kernel where the provided model_id runs
  1277. """
  1278. fetched = self.destinations[model_id]
  1279. if isinstance(fetched, int):
  1280. return fetched
  1281. else:
  1282. return self.name
  1283. def getActivity(self, model_id):
  1284. """
  1285. Returns the activity for a certain model id from the previous iteration
  1286. :param model_id: the model_id to check
  1287. :returns: float -- the activity
  1288. """
  1289. return self.activities.get(model_id, 0.0)
  1290. def getCompleteActivity(self):
  1291. """
  1292. Returns the complete dictionary of all activities
  1293. :returns: dict -- mapping of all activities
  1294. """
  1295. return self.activities
  1296. def getTotalActivity(self, time=(float('inf'), float('inf'))):
  1297. """
  1298. Returns a dictionary containing the total activity through the complete simulation run
  1299. :param time: time up to which to return activity
  1300. :returns: dict -- mapping of all activities, but simulation-wide
  1301. """
  1302. if not self.irreversible:
  1303. activities = {}
  1304. self.model.fetchActivity(time, activities)
  1305. addDict(activities, self.total_activities)
  1306. return activities
  1307. else:
  1308. return self.total_activities
  1309. def recomputeTA(self, model_id, time):
  1310. """
  1311. Recompute the timeAdvance of a specific model and reapply it. It should only be called after the model was changed using one of the provided functions.
  1312. The change will seemingly have happened right after the last simulation step, so your timeAdvance should **NOT** return something smaller than this.
  1313. The actual *absolute* time_next will be determined at the time of the last transition too. If the model was not altered in a way that causes a major change
  1314. to the state, the *elapsed* time attribute will take care of this call perfectly.
  1315. """
  1316. if hasattr(self.model, "scheduler"):
  1317. model = self.model_ids[model_id]
  1318. # Termination time will always be correct
  1319. model.elapsed = time - model.time_last[0]
  1320. ta = model.timeAdvance()
  1321. model.time_next = (time + model.elapsed + ta, 1)
  1322. if model.time_next[0] < time:
  1323. raise DEVSException("Model user modification causes a transition in the past")
  1324. self.model.scheduler.massReschedule([model])
  1325. self.model.setTimeNext()
  1326. else:
  1327. # We have not yet run coupledInit, so this will happen automatically
  1328. pass