controller.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587
  1. # Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
  2. # McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """
  16. Controller used as a specific simulation kernel
  17. """
  18. from pypdevs.basesimulator import BaseSimulator
  19. from pypdevs.logger import *
  20. import threading
  21. import pypdevs.accurate_time as time
  22. import pypdevs.middleware as middleware
  23. from pypdevs.DEVS import CoupledDEVS, AtomicDEVS
  24. from pypdevs.util import DEVSException
  25. from pypdevs.activityVisualisation import visualizeLocations
  26. from pypdevs.realtime.threadingBackend import ThreadingBackend
  27. from pypdevs.realtime.asynchronousComboGenerator import AsynchronousComboGenerator
  28. class Controller(BaseSimulator):
  29. """
  30. The controller class, which is a special kind of normal simulation kernel. This should always run on the node labeled 0.
  31. It contains some functions that are only required to be ran on a single node, such as GVT initiation
  32. """
  33. def __init__(self, name, model, server):
  34. """
  35. Constructor
  36. :param name: name of the controller
  37. :param model: model to host at the kernel
  38. :param server: the server to make requests on
  39. """
  40. BaseSimulator.__init__(self, name, model, server)
  41. self.waiting_lock = threading.Lock()
  42. self.accumulator = {}
  43. self.no_finish_ring = threading.Lock()
  44. self.no_finish_ring.acquire()
  45. self.location_cell_view = False
  46. self.graph = None
  47. self.allocations = None
  48. self.running_irreversible = None
  49. self.initial_allocator = None
  50. self.prev_termination_time = 0.0
  51. def __setstate__(self, retdict):
  52. """
  53. For pickling
  54. :param retdict: dictionary containing attributes and their value
  55. """
  56. BaseSimulator.__setstate__(self, retdict)
  57. self.waiting_lock = threading.Lock()
  58. self.no_finish_ring = threading.Lock()
  59. self.no_finish_ring.acquire()
  60. def GVTdone(self):
  61. """
  62. Notify this simulation kernel that the GVT calculation is finished
  63. """
  64. self.wait_for_gvt.set()
  65. def isFinished(self, running):
  66. """
  67. Checks if all kernels have indicated that they have finished simulation.
  68. If each kernel has indicated this, a final (expensive) check happens to
  69. prevent premature termination.
  70. :param running: the number of kernels that is simulating
  71. :returns: bool -- whether or not simulation is already finished
  72. """
  73. # NOTE make sure that GVT algorithm is not running at the moment, otherwise we deadlock!
  74. # it might be possible that the GVT algorithm starts immediately after the wait(), causing deadlock again
  75. # Now we are sure that the GVT algorithm is not running when we start this
  76. # It seems that we should be finished, so just ACK this with every simulation kernel before proceeding
  77. # it might be possible that the kernel's 'notifyRun' command is still on the way, making the simulation
  78. # stop too soon.
  79. self.no_finish_ring.acquire()
  80. msgcount = self.finishRing(0, 0, True)
  81. if msgcount == -1:
  82. # One of the nodes was still busy
  83. self.no_finish_ring.release()
  84. return False
  85. else:
  86. msgcount2 = self.finishRing(0, 0, True)
  87. # If they are equal, we are done
  88. ret = msgcount == msgcount2
  89. if not ret:
  90. self.no_finish_ring.release()
  91. else:
  92. self.waiting = 0
  93. return ret
  94. def waitFinish(self, running):
  95. """
  96. Wait until the specified number of kernels have all told that simulation
  97. finished.
  98. :param running: the number of kernels that is simulating
  99. """
  100. while 1:
  101. time.sleep(1)
  102. # Make sure that no relocations are running
  103. if self.isFinished(running):
  104. # All simulation kernels have told us that they are idle at the moment
  105. break
  106. self.run_gvt = False
  107. self.event_gvt.set()
  108. self.gvt_thread.join()
  109. def startGVTThread(self, gvt_interval):
  110. """
  111. Start the GVT thread
  112. :param gvt_interval: the interval between two successive GVT runs
  113. """
  114. # We seem to be the controller
  115. # Start up the GVT algorithm then
  116. self.event_gvt = threading.Event()
  117. self.run_gvt = True
  118. self.gvt_thread = threading.Thread(target=Controller.threadGVT,
  119. args=[self, gvt_interval])
  120. self.gvt_thread.daemon = True
  121. self.gvt_thread.start()
  122. def threadGVT(self, freq):
  123. """
  124. Run the GVT algorithm, this method should be called in its own thread,
  125. because it will block
  126. :param freq: the time to sleep between two GVT calculations
  127. """
  128. # Wait for the simulation to have done something useful before we start
  129. self.event_gvt.wait(freq)
  130. # Maybe simulation already finished...
  131. while self.run_gvt:
  132. self.receiveControl([float('inf'),
  133. float('inf'),
  134. self.accumulator,
  135. {}],
  136. True)
  137. # Wait until the lock is released elsewhere
  138. print("Waiting for clear")
  139. self.wait_for_gvt.wait()
  140. self.wait_for_gvt.clear()
  141. # Limit the GVT algorithm, otherwise this will flood the ring
  142. print("Cleared")
  143. self.event_gvt.wait(freq)
  144. def getVCDVariables(self):
  145. """
  146. Generate a list of all variables that exist in the current scope
  147. :returns: list -- all VCD variables in the current scope
  148. """
  149. variables = []
  150. for d in self.total_model.component_set:
  151. variables.extend(d.getVCDVariables())
  152. return variables
  153. def simulate_sync(self):
  154. """
  155. Synchronous simulation call, identical to the normal call, with the exception that it will be a blocking call as only "simulate" is marked as oneway.
  156. """
  157. BaseSimulator.simulate_sync(self)
  158. self.no_finish_ring.acquire()
  159. def simulate(self):
  160. """
  161. Run the actual simulation on the controller. This will simply 'intercept' the call to the original simulate and perform location visualisation when necessary.
  162. """
  163. self.checkForTemporaryIrreversible()
  164. self.no_finish_ring.release()
  165. if self.location_cell_view:
  166. from pypdevs.activityVisualisation import visualizeLocations
  167. visualizeLocations(self)
  168. # Call superclass (the actual simulation)
  169. BaseSimulator.simulate(self)
  170. self.prev_termination_time = self.termination_time[0]
  171. def getEventGraph(self):
  172. """
  173. Fetch a graph containing all connections and the number of events between the nodes. This is only useful when an initial allocator is chosen.
  174. :returns: dict -- containing source and destination, it will return the amount of events passed between them
  175. """
  176. return self.runAllocator()[0]
  177. def getInitialAllocations(self):
  178. """
  179. Get a list of all initial allocations. Will call the allocator to get the result.
  180. :returns: list -- containing all nodes and the models they host
  181. """
  182. return self.runAllocator()[1]
  183. def runAllocator(self):
  184. """
  185. Actually extract the graph of exchanged messages and run the allocator with this information.
  186. Results are cached.
  187. :returns: tuple -- the event graph and the allocations
  188. """
  189. # Only run this code once
  190. if self.graph is None and self.allocations is None:
  191. # It seems this is the first time
  192. if self.initial_allocator is None:
  193. # No allocator was defined, or it has already issued its allocation code, which resulted into 'nothing'
  194. self.graph = None
  195. self.allocations = None
  196. else:
  197. from pypdevs.util import constructGraph, saveLocations
  198. self.graph = constructGraph(self.model)
  199. allocs = self.initialAllocator.allocate(self.model.component_set,
  200. self.getEventGraph(),
  201. self.kernels,
  202. self.total_activities)
  203. self.allocations = allocs
  204. self.initial_allocator = None
  205. saveLocations("locationsave.txt",
  206. self.allocations,
  207. self.model_ids)
  208. return self.graph, self.allocations
  209. def setCellLocationTracer(self, x, y, location_cell_view):
  210. """
  211. Sets the Location tracer and all its configuration parameters
  212. :param x: the horizontal size of the grid
  213. :param y: the vertical size of the grid
  214. :param location_cell_view: whether or not to enable it
  215. """
  216. self.x_size = x
  217. self.y_size = y
  218. self.location_cell_view = location_cell_view
  219. def setRelocator(self, relocator):
  220. """
  221. Sets the relocator to the one provided by the user
  222. :param relocator: the relocator to use
  223. """
  224. self.relocator = relocator
  225. # Perform run-time configuration
  226. try:
  227. self.relocator.setController(self)
  228. except AttributeError:
  229. pass
  230. def setActivityTracking(self, at):
  231. """
  232. Sets the use of activity tracking, which will simply output the activity of all models at the end of the simulation
  233. :param at: whether or not to enable activity tracking
  234. """
  235. self.activity_tracking = at
  236. def setClassicDEVS(self, classic_DEVS):
  237. """
  238. Sets the use of Classic DEVS instead of Parallel DEVS.
  239. :param classicDEVS: whether or not to use Classic DEVS
  240. """
  241. # Do this once, to prevent checks for the classic DEVS formalism
  242. if classic_DEVS:
  243. # Methods, so CamelCase
  244. self.coupledOutputGeneration = self.coupledOutputGenerationClassic
  245. def setAllocator(self, initial_allocator):
  246. """
  247. Sets the use of an initial relocator.
  248. :param initial_allocator: whether or not to use an initial allocator
  249. """
  250. self.initial_allocator = initial_allocator
  251. if initial_allocator is not None:
  252. # Methods, so CamelCase
  253. self.atomicOutputGeneration_backup = self.atomicOutputGeneration
  254. self.atomicOutputGeneration = self.atomicOutputGenerationEventTracing
  255. def setDSDEVS(self, dsdevs):
  256. """
  257. Whether or not to check for DSDEVS events
  258. :param dsdevs: dsdevs boolean
  259. """
  260. self.use_DSDEVS = dsdevs
  261. def setRealtime(self, input_references):
  262. """
  263. Sets the use of realtime simulation.
  264. :param input_references: dictionary containing the string to port mapping
  265. """
  266. self.realtime = True
  267. self.realtime_port_references = input_references
  268. def setTerminationCondition(self, termination_condition):
  269. """
  270. Sets the termination condition of this simulation kernel.
  271. As soon as the condition is valid, it willl signal all nodes that they have to stop simulation as soon as they have progressed up to this simulation time.
  272. :param termination_condition: a function that accepts two parameters: *time* and *model*. Function returns whether or not to halt simulation
  273. """
  274. self.termination_condition = termination_condition
  275. self.termination_time_check = False
  276. def findAndPerformRelocations(self, gvt, activities, horizon):
  277. """
  278. First requests the relocator for relocations to perform, and afterwards actually perform them.
  279. :param gvt: the current GVT
  280. :param activities: list containing all activities of all nodes
  281. :param horizon: the horizon used in this activity tracking
  282. """
  283. # Now start moving all models according to the provided relocation directives
  284. relocate = self.relocator.getRelocations(gvt, activities, horizon)
  285. #print("Filtered relocate: " + str(relocate))
  286. if relocate:
  287. self.performRelocationsInit(relocate)
  288. def performRelocationsInit(self, relocate):
  289. """
  290. Perform the relocations specified in the parameter. Split of from the 'findAndPerformRelocations', to make it possible for other parts of the code
  291. to perform relocations too.
  292. :param relocate: dictionary containing the model_id as key and the value is the node to send it to
  293. """
  294. relocate = {key: relocate[key]
  295. for key in relocate
  296. if self.model_ids[key].location != relocate[key] and
  297. self.model_ids[key].relocatable}
  298. if not relocate:
  299. return
  300. if self.running_irreversible is not None:
  301. self.getProxy(self.running_irreversible).unsetIrreversible()
  302. self.running_irreversible = None
  303. while not self.no_finish_ring.acquire(False):
  304. if not self.run_gvt:
  305. self.GVTdone()
  306. return
  307. time.sleep(0)
  308. kernels = {}
  309. self.locked_kernels = set()
  310. relocation_rules = {}
  311. for model_id in relocate:
  312. source = self.model_ids[model_id].location
  313. destination = relocate[model_id]
  314. if source == destination:
  315. continue
  316. kernels[source] = kernels.get(source, 0) + 1
  317. kernels[destination] = kernels.get(destination, 0) + 1
  318. if kernels[source] == 1:
  319. # We are the first to lock it, so actually send the lock
  320. self.getProxy(source).requestMigrationLock()
  321. if kernels[destination] == 1:
  322. # We are the first to lock it, so actually send the lock
  323. self.getProxy(destination).requestMigrationLock()
  324. relocation_rules.setdefault((source, destination), set()).add(model_id)
  325. while relocation_rules:
  326. # Busy loop until everything is done
  327. # Don't use an iterator, as we will change the list
  328. for source, destination in relocation_rules.keys():
  329. if (source in self.locked_kernels and
  330. destination in self.locked_kernels):
  331. models = relocation_rules[(source, destination)]
  332. self.getProxy(source).migrateTo(destination, models)
  333. del relocation_rules[(source, destination)]
  334. kernels[source] -= len(models)
  335. kernels[destination] -= len(models)
  336. if kernels[source] == 0:
  337. self.getProxy(source).migrationUnlock()
  338. if kernels[destination] == 0:
  339. self.getProxy(destination).migrationUnlock()
  340. # OK, now check whether we need to visualize all locations or not
  341. if self.location_cell_view:
  342. visualizeLocations(self)
  343. # Possibly some node is now hosting all models, so allow this node to become irreversible for some time.
  344. self.checkForTemporaryIrreversible()
  345. # Allow the finishring algorithm again
  346. self.no_finish_ring.release()
  347. def checkForTemporaryIrreversible(self):
  348. """
  349. Checks if one node is hosting all the models. If this is the case, this node will gain 'temporary irreversibility',
  350. allowing it to skip state saving and thus avoiding the main overhead associated with time warp.
  351. """
  352. # Check whether or not everything is located at a single node now
  353. if self.relocator.useLastStateOnly():
  354. # If this is the case, we will be unable to know which state to save the activity for
  355. # So disable it for now
  356. # This does offer a slight negative impact, though it isn't really worth fixing for the time being
  357. return
  358. if isinstance(self.destinations[0], int):
  359. current_kernel = self.destinations[0]
  360. else:
  361. current_kernel = 0
  362. for kernel in self.destinations:
  363. if isinstance(kernel, int):
  364. loc = kernel
  365. else:
  366. loc = 0
  367. if loc != current_kernel:
  368. break
  369. else:
  370. # We didn't break, so one of the nodes runs all at once
  371. self.getProxy(current_kernel).setIrreversible()
  372. self.running_irreversible = current_kernel
  373. def notifyLocked(self, remote):
  374. """
  375. Notify this kernel that the model is locked
  376. :param remote: the node that is locked
  377. """
  378. self.locked_kernels.add(remote)
  379. def dsRemovePort(self, port):
  380. """
  381. Remove a port from the simulation
  382. :param port: the port to remove
  383. """
  384. for iport in port.inline:
  385. iport.outline = [p for p in iport.outline if p != port]
  386. for oport in port.outline:
  387. oport.inline = [p for p in oport.inline if p != port]
  388. self.dc_altered.add(port)
  389. def dsDisconnectPorts(self, p1, p2):
  390. """
  391. Disconnect two ports
  392. :param p1: source port
  393. :param p2: target port
  394. """
  395. self.dc_altered.add(p1)
  396. def dsConnectPorts(self, p1, p2):
  397. """
  398. Connect two ports
  399. :param p1: source port
  400. :param p2: target port
  401. """
  402. self.dc_altered.add(p1)
  403. def dsUnscheduleModel(self, model):
  404. """
  405. Dynamic Structure change: remove an existing model
  406. :param model: the model to remove
  407. """
  408. if isinstance(model, CoupledDEVS):
  409. for m in model.component_set:
  410. self.dsUnscheduleModel(m, False)
  411. for port in model.IPorts:
  412. self.dsRemovePort(port)
  413. for port in model.OPorts:
  414. self.dsRemovePort(port)
  415. elif isinstance(model, AtomicDEVS):
  416. self.model.component_set.remove(model)
  417. self.model.models.remove(model)
  418. # The model is removed, so remove it from the scheduler
  419. self.model.scheduler.unschedule(model)
  420. self.model_ids[model.model_id] = None
  421. self.destinations[model.model_id] = None
  422. self.model.local_model_ids.remove(model.model_id)
  423. for port in model.IPorts:
  424. self.dsRemovePort(port)
  425. for port in model.OPorts:
  426. self.dsRemovePort(port)
  427. else:
  428. raise DEVSException("Unknown model to schedule: %s" % model)
  429. def dsScheduleModel(self, model):
  430. """
  431. Dynamic Structure change: create a new model
  432. :param model: the model to add
  433. """
  434. if isinstance(model, CoupledDEVS):
  435. model.full_name = model.parent.full_name + "." + model.getModelName()
  436. for m in model.component_set:
  437. self.dsScheduleModel(m)
  438. for p in model.IPorts:
  439. self.dc_altered.add(p)
  440. for p in model.OPorts:
  441. self.dc_altered.add(p)
  442. elif isinstance(model, AtomicDEVS):
  443. model.model_id = len(self.model_ids)
  444. model.full_name = model.parent.full_name + "." + model.getModelName()
  445. model.location = self.name
  446. self.model_ids.append(model)
  447. self.destinations.append(model)
  448. self.model.component_set.append(model)
  449. self.model.models.append(model)
  450. self.model.local_model_ids.add(model.model_id)
  451. self.atomicInit(model, self.current_clock)
  452. p = model.parent
  453. model.select_hierarchy = [model]
  454. while p != None:
  455. model.select_hierarchy = [p] + model.select_hierarchy
  456. p = p.parent
  457. if model.time_next[0] == self.current_clock[0]:
  458. # If scheduled for 'now', update the age manually
  459. model.time_next = (model.time_next[0], self.current_clock[1])
  460. # It is a new model, so add it to the scheduler too
  461. self.model.scheduler.schedule(model)
  462. for p in model.IPorts:
  463. self.dc_altered.add(p)
  464. for p in model.OPorts:
  465. self.dc_altered.add(p)
  466. else:
  467. raise DEVSException("Unknown model to schedule: %s" % model)
  468. def setRealTime(self, subsystem, generator_file, ports, scale, listeners, args=[]):
  469. """
  470. Set the use of realtime simulation
  471. :param subsystem: defines the subsystem to use
  472. :param generator_file: filename to use for generating external inputs
  473. :param ports: input port references
  474. :param scale: the scale factor for realtime simulation
  475. :param listeners: the ports on which we should listen for output
  476. :param args: additional arguments for the realtime backend
  477. """
  478. self.realtime = True
  479. self.threading_backend = ThreadingBackend(subsystem, args)
  480. self.rt_zerotime = time.time()
  481. async_gen = AsynchronousComboGenerator(generator_file, self.threading_backend)
  482. self.asynchronous_generator = async_gen
  483. self.realtime_starttime = time.time()
  484. self.portmap = ports
  485. self.model.listeners = listeners
  486. self.realtime_scale = scale
  487. def gameLoop(self):
  488. """
  489. Perform all computations up to the current time. Only applicable for the game loop realtime backend.
  490. """
  491. self.threading_backend.step()
  492. def realtimeInterrupt(self, string):
  493. """
  494. Create an interrupt from other Python code instead of using stdin or the file
  495. :param string: the value to inject
  496. """
  497. self.threading_backend.interrupt(string)
  498. def stateChange(self, model_id, variable, value):
  499. """
  500. Notification function for when a variable's value is altered. It will notify the node that is responsible for simulation of this model AND also notify the tracers of the event.
  501. :param model_id: the model_id of the model whose variable was changed
  502. :param variable: the name of the variable that was changed (as a string)
  503. :param value: the new value of the variable
  504. """
  505. # Call the node that hosts this model and order it to recompute timeAdvance
  506. proxy = self.getProxy(self.model_ids[model_id].location)
  507. proxy.recomputeTA(model_id, self.prev_termination_time)
  508. self.tracers.tracesUser(self.prev_termination_time,
  509. self.model_ids[model_id],
  510. variable,
  511. value)