model.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. import sys
  2. sys.path.append("../src/")
  3. from DEVS import CoupledDEVS, AtomicDEVS, RootDEVS, directConnect
  4. from infinity import INFINITY
  5. from collections import defaultdict
  6. from util import allZeroDict, addDict
  7. from statesavers import PickleHighestState as state_saver
  8. from message import NetworkMessage
  9. from messageScheduler import MessageScheduler
  10. class SimulatedCModel(CoupledDEVS):
  11. def __init__(self):
  12. CoupledDEVS.__init__(self, "root")
  13. self.model1 = self.addSubModel(SimulatedModel(1), 0)
  14. self.model2 = self.addSubModel(SimulatedModel(2), 1)
  15. self.model3 = self.addSubModel(SimulatedModel(3), 1)
  16. self.connectPorts(self.model1.outport, self.model2.inport)
  17. self.connectPorts(self.model2.outport, self.model3.inport)
  18. self.connectPorts(self.model3.outport, self.model1.inport)
  19. class ModelState(object):
  20. def __init__(self, value):
  21. self.value = value
  22. self.stateHistory = []
  23. class SimulatedModel(AtomicDEVS):
  24. def __init__(self, name):
  25. AtomicDEVS.__init__(self, str(name))
  26. self.inport = self.addInPort("inport")
  27. self.outport = self.addOutPort("outport")
  28. if name == 1:
  29. self.state = ModelState(2)
  30. else:
  31. self.state = ModelState(None)
  32. def intTransition(self):
  33. #print("INTERNAL TRANSITION @ %s, %s" % (self.getModelFullName(), self.timeLast))
  34. self.state.value = None
  35. self.state.stateHistory.append("INT " + str(self.timeLast))
  36. print("HISTORY of %s: %s" % (self.getModelFullName(), self.state.stateHistory))
  37. return self.state
  38. def extTransition(self, inputs):
  39. #print("EXTERNAL TRANSITION @ %s, %s" % (self.getModelFullName(), self.timeLast))
  40. self.state.value = inputs[self.inport][0]
  41. self.state.stateHistory.append("EXT " + str(self.timeLast))
  42. print("HISTORY of %s: %s" % (self.getModelFullName(), self.state.stateHistory))
  43. return self.state
  44. def timeAdvance(self):
  45. if self.state.value is not None:
  46. return 0.1
  47. else:
  48. return 1.0
  49. #return INFINITY
  50. def outputFnc(self):
  51. return {self.outport: [self.state.value]}
  52. class Cluster(CoupledDEVS):
  53. def __init__(self, nodes):
  54. CoupledDEVS.__init__(self, "Cluster")
  55. self.nodes = [self.addSubModel(Node(i, nodes)) for i in range(nodes)]
  56. self.network = [[self.addSubModel(Network("%i-->%i" % (j, i))) for i in range(nodes)] for j in range(nodes)]
  57. for startid in range(nodes):
  58. for endid in range(nodes):
  59. self.connectPorts(self.nodes[startid].outports[endid], self.network[startid][endid].inport)
  60. self.connectPorts(self.network[startid][endid].outport, self.nodes[endid].inports[startid])
  61. class NodeState(object):
  62. def __init__(self, name, totalsize):
  63. self.simulationtime = (0, 0)
  64. self.prevtime = (0, 0)
  65. self.terminationtime = (3, 0)
  66. model = SimulatedCModel()
  67. self.model_ids = []
  68. locations = defaultdict(list)
  69. model.finalize(name="", model_counter=0, model_ids=self.model_ids, locations=locations, selectHierarchy=[])
  70. if isinstance(model, CoupledDEVS):
  71. model.componentSet = directConnect(model.componentSet, True)
  72. self.destinations = [None] * len(model.componentSet)
  73. self.kernels = len(locations.keys())
  74. local = []
  75. for m in model.componentSet:
  76. self.destinations[m.model_id] = m if m.location == name else m.location
  77. if m.location == name:
  78. m.timeNext = (m.timeAdvance(), 1)
  79. m.timeLast = (0, 0)
  80. m.oldStates = [state_saver(m.timeLast, m.timeNext, m.state, 0.0, {}, 0.0)]
  81. local.append(m)
  82. self.model = RootDEVS(local, model.componentSet, ("schedulerML", "SchedulerML"))
  83. self.model.setScheduler(self.model.schedulerType)
  84. self.model.setTimeNext()
  85. self.externalQueue = {}
  86. self.color = False
  87. self.sendmsgcounter = 0
  88. self.outputQueue = []
  89. self.messageScheduler = MessageScheduler()
  90. self.V = [{}, {}, {}, {}]
  91. self.Tmin = float('inf')
  92. self.blockOutgoing = None
  93. self.run_GVT = 1.0
  94. self.gvt_check = None
  95. self.GVT = -float('inf')
  96. self.relocation_rules = None
  97. self.kernels_to_relocate = None
  98. from manualRelocator import ManualRelocator
  99. self.relocator = ManualRelocator()
  100. self.relocator.addDirective(1.0, 1, 0)
  101. self.locked = False
  102. self.accumulator = {}
  103. def copy(self):
  104. #TODO keep this up to date
  105. import cPickle
  106. a = cPickle.loads(cPickle.dumps(self))
  107. a.model = self.model
  108. a.model_ids = list(self.model_ids)
  109. a.destinations = list(self.destinations)
  110. a.externalQueue = dict(self.externalQueue)
  111. a.outputQueue = list(self.outputQueue)
  112. return a
  113. def __getstate__(self):
  114. retdict = {}
  115. for i in dir(self):
  116. if getattr(self, i).__class__.__name__ in ["instancemethod", "method-wrapper", "builtin_function_or_method"]:
  117. continue
  118. elif str(i).startswith("__"):
  119. continue
  120. retdict[str(i)] = getattr(self, i)
  121. return retdict
  122. def __setstate__(self, inp):
  123. for i in inp:
  124. setattr(self, i, inp[i])
  125. class Node(AtomicDEVS):
  126. def __init__(self, name, totalsize):
  127. AtomicDEVS.__init__(self, "Node_%i" % name)
  128. self.nodename = name
  129. self.totalsize = totalsize
  130. self.inports = [self.addInPort("inport_%i" % i) for i in range(totalsize)]
  131. self.outports = [self.addOutPort("outport_%i" % i) for i in range(totalsize)]
  132. self.state = NodeState(name, totalsize)
  133. def genUUID(self):
  134. self.state.sendmsgcounter += 1
  135. return "%s-%s" % (self.nodename, self.state.sendmsgcounter)
  136. def send(self, model_id, timestamp, content):
  137. if self.state.blockOutgoing == timestamp:
  138. return
  139. msg = NetworkMessage(timestamp, content, self.genUUID(), self.state.color, model_id)
  140. self.state.outputQueue.append(msg)
  141. self.notifySend(self.state.destinations[model_id], timestamp[0], msg.color)
  142. self.state.externalQueue.setdefault(self.outports[self.state.destinations[model_id]], []).append(msg)
  143. def processMessage(self, clock):
  144. try:
  145. message = self.state.messageScheduler.readFirst()
  146. except IndexError:
  147. # No input messages
  148. return clock
  149. if message.timestamp < clock:
  150. # The message is sent before the timenext, so update the clock
  151. clock = message.timestamp
  152. try:
  153. while (abs(clock[0] - message.timestamp[0]) < 1e-6 and (clock[1] == message.timestamp[1])):
  154. print("Process message with UUID " + str(message.uuid))
  155. for port in message.content:
  156. port.hostDEVS.myInput.setdefault(port, []).extend(message.content[port])
  157. self.state.transitioning[port.hostDEVS] |= 2
  158. self.state.messageScheduler.removeFirst()
  159. message = self.state.messageScheduler.readFirst()
  160. except IndexError:
  161. # At the end of the scheduler, so we are done
  162. pass
  163. return clock
  164. def receiveControl(self, msg, first=False):
  165. self.state.controlmsg = msg
  166. m_clock = msg[0]
  167. m_send = msg[1]
  168. waiting_vector = msg[2]
  169. accumulating_vector = msg[3]
  170. color = self.state.color
  171. finished = (self.nodename == 0 and not first and (color == 0 or color == 2))
  172. if self.nodename == 0 and not first:
  173. if not allZeroDict(waiting_vector):
  174. raise DEVSException("GVT bug detected")
  175. waiting_vector = accumulating_vector
  176. accumulating_vector = {}
  177. if finished:
  178. from math import floor
  179. GVT = floor(min(m_clock, m_send))
  180. self.state.accumulator = waiting_vector
  181. self.state.externalQueue.setdefault(self.outports[self.nodename], []).append(("setGVT_local", [GVT, [], self.state.relocator.useLastStateOnly()]))
  182. return None
  183. else:
  184. return self.tryIfOk(color, waiting_vector, accumulating_vector)
  185. """
  186. if self.state.color == 0 or self.state.color == 2:
  187. # We are currently white, about to turn red
  188. if self.nodename == 0 and not first:
  189. # The controller received the message that went around completely
  190. # The count != check is needed to distinguish between init and finish
  191. # So we are finished now, don't update the color here!!
  192. if not allZeroDict(count):
  193. raise DEVSException("GVT bug detected")
  194. # Perform some rounding to prevent slight deviations due to floating point errors
  195. from math import floor
  196. GVT = floor(min(m_clock, m_send))
  197. print("Found GVT " + str(GVT))
  198. # Do this with a proxy to make it async
  199. self.state.externalQueue.setdefault(self.outports[self.nodename], []).append(("setGVT_local", [GVT, [], self.state.relocator.useLastStateOnly()]))
  200. else:
  201. # Either at the controller at init
  202. # or just a normal node that is about to turn red
  203. self.state.color = (self.state.color + 1) % 4
  204. addDict(count, self.state.V[v])
  205. self.state.V[v] = {}
  206. msg = [min(m_clock, self.state.prevtime[0]), min(m_send, self.state.Tmin), count]
  207. self.state.externalQueue.setdefault(self.outports[(self.nodename+1)%self.totalsize], []).append(("receiveControl", [msg]))
  208. return None
  209. elif self.state.color == 1 or self.state.color == 3:
  210. # We are currently red, about to turn white
  211. # First wait for all messages in the medium
  212. return self.tryIfOk(v, count)
  213. """
  214. def findAndPerformRelocations(self, GVT, activities, horizon):
  215. relocate = self.state.relocator.getRelocations(GVT, activities, horizon)
  216. relocate = {key: relocate[key] for key in relocate if self.state.model_ids[key].location != relocate[key] and self.state.model_ids[key].relocatable}
  217. if not relocate:
  218. self.state.run_GVT = 1.0
  219. return
  220. kernels = {}
  221. self.state.locked_kernels = set()
  222. relocation_rules = {}
  223. for model_id in relocate:
  224. source = self.state.model_ids[model_id].location
  225. destination = relocate[model_id]
  226. if source == destination:
  227. continue
  228. kernels[source] = kernels.get(source, 0) + 1
  229. kernels[destination] = kernels.get(destination, 0) + 1
  230. if kernels[source] == 1:
  231. # We are the first to lock it, so actually send the lock
  232. self.state.externalQueue.setdefault(self.outports[source], []).append(("requestMigrationLock", []))
  233. #self.getProxy(source).requestMigrationLock()
  234. if kernels[destination] == 1:
  235. # We are the first to lock it, so actually send the lock
  236. self.state.externalQueue.setdefault(self.outports[destination], []).append(("requestMigrationLock", []))
  237. #self.getProxy(destination).requestMigrationLock()
  238. relocation_rules.setdefault((source, destination), set()).add(model_id)
  239. self.performRelocations(relocation_rules, kernels)
  240. def performRelocations(self, relocation_rules, kernels):
  241. for source, destination in relocation_rules.keys():
  242. if source in self.state.locked_kernels and destination in self.state.locked_kernels:
  243. models = relocation_rules[(source, destination)]
  244. unlock = []
  245. if kernels[source] == 1:
  246. unlock.append(source)
  247. if kernels[destination] == 1:
  248. unlock.append(destination)
  249. self.state.externalQueue.setdefault(self.outports[source], []).append(("migrateTo", [destination, models, unlock]))
  250. #self.getProxy(source).migrateTo(destination, models)
  251. del relocation_rules[(source, destination)]
  252. kernels[source] -= len(models)
  253. kernels[destination] -= len(models)
  254. if relocation_rules:
  255. # Still something to do
  256. self.state.relocation_rules = relocation_rules
  257. self.state.kernels_to_relocate = kernels
  258. else:
  259. # At the end, so a normal return
  260. self.state.relocation_rules = None
  261. self.state.kernels_to_relocate = None
  262. def setGVT_local(self, GVT, activities, lastStateOnly):
  263. if GVT < self.state.GVT:
  264. raise DEVSException("GVT cannot decrease from " + str(self.GVT) + " to " + str(GVT) + "!")
  265. if GVT == self.state.GVT:
  266. # At the controller too
  267. # Restart the GVT algorithm within 1 time unit
  268. if activities:
  269. if self.state.oldGVT == -float('inf'):
  270. self.oldGVT = 0.
  271. horizon = self.state.GVT - self.state.oldGVT
  272. self.findAndPerformRelocations(GVT, activities, horizon)
  273. else:
  274. self.state.oldGVT = self.state.GVT
  275. self.state.GVT = GVT
  276. nqueue = []
  277. self.state.messageScheduler.cleanup((GVT, 1))
  278. #self.performActions(GVT)
  279. found = False
  280. for index in range(len(self.state.outputQueue)):
  281. if self.state.outputQueue[index].timestamp[0] >= GVT:
  282. found = True
  283. self.state.outputQueue = self.state.outputQueue[index:]
  284. break
  285. if not found:
  286. self.state.outputQueue = []
  287. self.state.activities = {}
  288. self.state.model.setGVT(GVT, self.state.activities, lastStateOnly)
  289. if lastStateOnly:
  290. activitySum = 0
  291. else:
  292. activitySum = sum(self.state.activities.values())
  293. activities.append((self.name, activitySum))
  294. self.state.externalQueue.setdefault(self.outports[(self.nodename+1)%self.totalsize], []).append(("setGVT_local", [GVT, activities, lastStateOnly]))
  295. def tryIfOk(self, color, waiting_vector, accumulating_vector):
  296. prevcolor = 3 if color == 0 else color - 1
  297. if self.state.V[prevcolor].get(self.nodename, 0) + self.state.controlmsg[2].get(self.nodename, 0) <= 0:
  298. addDict(waiting_vector, self.state.V[prevcolor])
  299. addDict(accumulating_vector, self.state.V[color])
  300. self.state.V[prevcolor] = {}
  301. self.state.V[color] = {}
  302. ntime = self.state.prevtime[0] if self.nodename == 0 else min(self.state.controlmsg[0], self.state.prevtime[0])
  303. msg = [ntime, min(self.state.controlmsg[1], self.state.Tmin), waiting_vector, accumulating_vector]
  304. self.state.Tmin = float('inf')
  305. self.state.externalQueue.setdefault(self.outports[(self.nodename+1)%self.totalsize], []).append(("receiveControl", [msg]))
  306. self.state.color = (self.state.color + 1) % 4
  307. return False
  308. else:
  309. return color, waiting_vector, accumulating_vector
  310. def activateModel(self, model_id, currentState):
  311. new_model = self.state.model_ids[model_id]
  312. old_location = new_model.location
  313. new_model.location = self.nodename
  314. self.state.model.componentSet.append(new_model)
  315. self.state.model.local_model_ids.add(new_model.model_id)
  316. new_model.timeLast = currentState[0]
  317. new_model.timeNext = currentState[1]
  318. new_model.state = currentState[2]
  319. new_model.oldStates = [state_saver(new_model.timeLast, new_model.timeNext, new_model.state, 0.0, {}, 0.0)]
  320. # It is a new model, so add it to the scheduler too
  321. self.state.model.scheduler.schedule(new_model)
  322. self.state.destinations[model_id] = new_model
  323. self.state.model.setTimeNext()
  324. self.state.activities[model_id] = 0.0
  325. def messageTransfer(self, extraction):
  326. self.state.messageScheduler.insert(extraction, self.state.model_ids)
  327. def migrateTo(self, destination, model_ids, unlock):
  328. # Assumes that the simlock is already acquired
  329. # Make sure that the model that we are migrating is local here
  330. #assert info("Migrating " + str(model_ids) + " to " + str(destination))
  331. models = set()
  332. for model_id in model_ids:
  333. if isinstance(self.state.destinations[model_id], int):
  334. raise DEVSException("Cannot migrate model that is not local to the source!")
  335. if not self.state.destinations[model_id].relocatable:
  336. raise DEVSException("Model %s was marked as fixed and is therefore not allowed to be relocated" % self.state.destinations[model_id].getModelFullName())
  337. models.add(self.state.destinations[model_id])
  338. destination = int(destination)
  339. if destination == self.name:
  340. # Model is already there...
  341. return
  342. #assert info("Migration approved of %s from node %d to node %d" % (model_ids, self.name, destination))
  343. for model in models:
  344. # All models are gone here, so remove them from the scheduler
  345. self.state.model.scheduler.unschedule(model)
  346. for i in range(self.state.kernels):
  347. if i != destination and i != self.name:
  348. self.state.externalQueue.setdefault(self.outports[i], []).append(("notifyMigration", [model_ids, destination]))
  349. #self.getProxy(i).notifyMigration(model_ids, destination)
  350. self.state.externalQueue.setdefault(self.outports[destination], []).append(("messageTransfer", [self.state.messageScheduler.extract(model_ids)]))
  351. #remote.messageTransfer(self.inputScheduler.extract(model_ids))
  352. for model in models:
  353. # No need to ask the new node whether or not there are specific nodes that also have to be informed
  354. self.state.externalQueue.setdefault(self.outports[destination], []).append(("activateModel", [model.model_id, (model.timeLast, model.timeNext, model.state)]))
  355. #remote.activateModel(model.model_id, (model.timeLast, model.timeNext, model.state))
  356. # Delete our representation of the model
  357. model.state = None
  358. model.oldStates = []
  359. del self.state.activities[model.model_id]
  360. for m in unlock:
  361. self.state.externalQueue.setdefault(self.outports[m], []).append(("migrationUnlock", []))
  362. # Remove the model from the componentSet of the RootDEVS
  363. self.state.model.componentSet = [m for m in self.state.model.componentSet if m not in models]
  364. for model_id in model_ids:
  365. self.state.model.local_model_ids.remove(model_id)
  366. self.state.destinations[model_id] = destination
  367. self.state.model_ids[model_id].location = destination
  368. # Now update the timeNext and timeLast values here
  369. self.state.model.setTimeNext()
  370. def notifyMigration(self, model_ids, destination):
  371. if destination == self.nodename:
  372. # No need to notify ourselves, simply here for safety as it shouldn't be called
  373. return
  374. for model_id in model_ids:
  375. self.state.destinations[model_id] = destination
  376. self.state.model_ids[model_id].location = destination
  377. def requestMigrationLock(self):
  378. self.state.locked = True
  379. self.revert_local((self.state.GVT, 0))
  380. self.state.externalQueue.setdefault(self.outports[0], []).append(("notifyLocked", [self.nodename]))
  381. def migrationUnlock(self):
  382. self.state.locked = False
  383. def notifyLocked(self, name):
  384. self.state.locked_kernels.add(name)
  385. def intTransition(self):
  386. # Just do some processing
  387. self.state.run_GVT -= self.timeAdvance()
  388. self.state.externalQueue = {}
  389. self.state.transitioning = defaultdict(int)
  390. if self.state.run_GVT <= 0 and self.nodename == 0:
  391. # Start the GVT algorithm
  392. self.receiveControl([float('inf'), float('inf'), self.state.accumulator, {}], True)
  393. self.state.run_GVT = float('inf')
  394. if self.state.gvt_check is not None:
  395. rv = self.tryIfOk(*self.state.gvt_check)
  396. if not isinstance(rv, tuple):
  397. self.state.gvt_check = None
  398. if self.state.relocation_rules is not None:
  399. self.performRelocations(self.state.relocation_rules, self.state.kernels_to_relocate)
  400. return self.state
  401. if self.state.locked:
  402. return self.state
  403. ctime = self.processMessage(self.state.model.timeNext)
  404. if ctime > self.state.terminationtime:
  405. self.state.simulationtime = ctime
  406. return self.state
  407. outputs = {}
  408. transitioning = self.state.model.scheduler.getImminent(ctime)
  409. for i in transitioning:
  410. outputs.update(i.outputFnc())
  411. self.state.transitioning[i] |= 1
  412. remotes = {}
  413. for i in outputs:
  414. for dest in i.outLine:
  415. destADEVS = dest.hostDEVS
  416. if destADEVS.location == self.nodename:
  417. destADEVS.myInput.setdefault(dest, []).extend(outputs[i])
  418. self.state.transitioning[destADEVS] |= 2
  419. else:
  420. remotes.setdefault(destADEVS.model_id, {}).setdefault(dest.port_id, []).extend(outputs[i])
  421. for destination in remotes:
  422. self.send(destination, ctime, remotes[destination])
  423. for aDEVS in self.state.transitioning:
  424. t = self.state.transitioning[aDEVS]
  425. aDEVS.timeLast = ctime
  426. activityTrackingPreValue = aDEVS.preActivityCalculation()
  427. if t == 1:
  428. aDEVS.state = aDEVS.intTransition()
  429. elif t == 2:
  430. aDEVS.elapsed = ctime[0] - aDEVS.timeLast[0]
  431. aDEVS.state = aDEVS.extTransition(aDEVS.myInput)
  432. elif t == 3:
  433. aDEVS.state = aDEVS.confTransition(aDEVS.myInput)
  434. ta = aDEVS.timeAdvance()
  435. aDEVS.timeNext = (aDEVS.timeLast[0] + ta, 1 if ta != 0 else aDEVS.timeLast[1] + 1)
  436. aDEVS.oldStates.append(state_saver(aDEVS.timeLast, aDEVS.timeNext, aDEVS.state, aDEVS.postActivityCalculation(activityTrackingPreValue), {}, 0))
  437. aDEVS.myInput = {}
  438. self.state.model.scheduler.massReschedule(self.state.transitioning.keys())
  439. self.state.prevtime = ctime
  440. self.state.model.setTimeNext()
  441. self.state.simulationtime = self.state.model.timeNext
  442. return self.state
  443. def notifyReceive(self, color):
  444. self.state.V[color][self.nodename] = self.state.V[color].get(self.nodename, 0) - 1
  445. def notifySend(self, destination, timestamp, color):
  446. self.state.V[color][destination] = self.state.V[color].get(destination, 0) + 1
  447. if color == 1 or color == 3:
  448. self.state.Tmin = min(self.state.Tmin, timestamp)
  449. def revert_local(self, time):
  450. self.state.messageScheduler.revert(time)
  451. self.state.model.revert(time, False)
  452. self.state.model.setTimeNext()
  453. self.state.prevtime = time
  454. self.state.simulationtime = (0, 0)
  455. # Invalidate all output messages after or at time
  456. end = -1
  457. unschedules = {}
  458. unschedules_mintime = {}
  459. print("Reverting to time " + str(time))
  460. for index, value in enumerate(self.state.outputQueue):
  461. # Do not invalidate messages at this time itself, as they are processed in this time step and not generated in this timestep
  462. if value.timestamp > time:
  463. model_id = value.destination
  464. unschedules_mintime[model_id] = min(unschedules_mintime.get(model_id, (float('inf'), 0)), value.timestamp)
  465. unschedules.setdefault(model_id, []).append(value.uuid)
  466. else:
  467. #assert debug("NOT invalidating " + str(value.uuid))
  468. end = index
  469. self.state.outputQueue = self.state.outputQueue[:end+1]
  470. try:
  471. self.state.blockOutgoing = self.state.outputQueue[-1].timestamp
  472. except IndexError:
  473. self.state.blockOutgoing = None
  474. # Don't need the Vlock here, as we already have it
  475. for model_id in unschedules:
  476. dest_kernel = self.state.destinations[model_id]
  477. if not isinstance(dest_kernel, int):
  478. raise DEVSException("Impossible")
  479. continue
  480. mintime = unschedules_mintime[model_id]
  481. # Assume we have the simlock already
  482. self.state.externalQueue.setdefault(self.outports[dest_kernel], []).append(("receiveAntiMessage", [mintime, model_id, unschedules[model_id], self.state.color]))
  483. self.notifySend(dest_kernel, mintime[0], self.state.color)
  484. def extTransition(self, inputs):
  485. self.state.run_GVT -= self.elapsed
  486. for port in inputs:
  487. for msg in inputs[port]:
  488. if isinstance(msg, NetworkMessage):
  489. self.notifyReceive(msg.color)
  490. if msg.destination not in self.state.model.local_model_ids:
  491. print("FORWARD MSG " + str(msg.uuid))
  492. dest = self.state.destinations[msg.destination]
  493. msg.color = self.state.color
  494. self.notifySend(dest, msg.timestamp[0], msg.color)
  495. self.state.externalQueue.setdefault(self.outports[dest], []).append(msg)
  496. continue
  497. msg.content = {self.state.model_ids[msg.destination].ports[port]: msg.content[port] for port in msg.content}
  498. if msg.timestamp <= self.state.prevtime:
  499. self.revert_local(msg.timestamp)
  500. self.state.messageScheduler.schedule(msg)
  501. elif isinstance(msg, tuple):
  502. # Other kind of message
  503. action, args = msg
  504. if action == "receiveControl":
  505. rv = getattr(self, action)(*args)
  506. if isinstance(rv, tuple):
  507. # Try again later
  508. self.state.gvt_check = rv
  509. else:
  510. self.state.gvt_check = None
  511. else:
  512. getattr(self, action)(*args)
  513. # Put the return values in a queue if necessary
  514. self.state.simulationtime = (0, 0)
  515. return self.state
  516. def receiveAntiMessage(self, time, model_id, uuids, color):
  517. self.notifyReceive(color)
  518. print("Received anti message for uuids " + str(uuids))
  519. if model_id not in self.state.model.local_model_ids and model_id is not None:
  520. print("FORWARD ANTIMSG")
  521. self.state.externalQueue.setdefault(self.outports[self.state.destinations[model_id]], []).append(("receiveAntiMessages", [mintime, model_id, uuids, self.state.color]))
  522. self.notifySend(self.state.destinations[model_id], mintime[0], self.state.color)
  523. return
  524. if time <= self.state.prevtime:
  525. self.revert_local(time)
  526. self.state.messageScheduler.massUnschedule(uuids)
  527. def timeAdvance(self):
  528. if self.state.externalQueue:
  529. return 0.01
  530. elif self.state.simulationtime < self.state.terminationtime:
  531. return 0.1
  532. else:
  533. return INFINITY
  534. def outputFnc(self):
  535. return self.state.externalQueue
  536. class NetworkState(object):
  537. def __init__(self):
  538. self.lst = []
  539. def copy(self):
  540. a = NetworkState()
  541. a.lst = list(self.lst)
  542. return a
  543. class Network(AtomicDEVS):
  544. def __init__(self, name):
  545. AtomicDEVS.__init__(self, name)
  546. self.state = NetworkState()
  547. self.inport = self.addInPort("inport")
  548. self.outport = self.addOutPort("outport")
  549. def intTransition(self):
  550. self.state.lst = []
  551. return self.state
  552. def extTransition(self, inputs):
  553. msgs = inputs[self.inport]
  554. self.state.lst.extend(msgs)
  555. return self.state
  556. def timeAdvance(self):
  557. if self.state.lst:
  558. #return 1.0
  559. return 0.1
  560. #return 0.01
  561. else:
  562. return INFINITY
  563. def outputFnc(self):
  564. return {self.outport: self.state.lst}