1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- from pypdevs.DEVS import AtomicDEVS, DEVSException
- from pypdevs.infinity import INFINITY
- from pypdevs.simulator import Simulator
- class TrafficLightMode:
- def __init__(self, current="red"):
- self.set(current)
- def set(self, value="red"):
- self.__colour=value
- def get(self):
- return self.__colour
- def __str__(self):
- return self.get()
- class TrafficLight(AtomicDEVS):
- def __init__(self, name):
- AtomicDEVS.__init__(self, name)
- self.state = TrafficLightMode("red")
- self.INTERRUPT = self.addInPort(name="INTERRUPT")
- self.OBSERVED = self.addOutPort(name="OBSERVED")
- def extTransition(self, inputs):
- input = inputs[self.INTERRUPT][0]
- state = self.state.get()
- if input == "toManual":
- if state == "manual":
- # staying in manual mode
- return TrafficLightMode("manual")
- if state in ("red", "green", "yellow"):
- return TrafficLightMode("manual")
- elif input == "toAutonomous":
- if state == "manual":
- return TrafficLightMode("red")
- raise DEVSException("Unkown input in TrafficLight")
- def intTransition(self):
- state = self.state.get()
- if state == "red":
- return TrafficLightMode("green")
- elif state == "green":
- return TrafficLightMode("yellow")
- elif state == "yellow":
- return TrafficLightMode("red")
- else:
- raise DEVSException("Unkown state in TrafficLight")
- def outputFnc(self):
- state = self.state.get()
- if state == "red":
- return {self.OBSERVED: ["grey"]}
- elif state == "green":
- return {self.OBSERVED: ["yellow"]}
- elif state == "yellow":
- return {self.OBSERVED: ["grey"]}
- else:
- raise DEVSException("Unknown state in TrafficLight")
- def timeAdvance(self):
- state = self.state.get()
- if state == "red":
- return 60
- elif state == "green":
- return 50
- elif state == "yellow":
- return 10
- elif state == "manual":
- return INFINITY
- else:
- raise DEVSException("Unknown state in TrafficLight")
- model = TrafficLight(name="trafficLight")
- refs = {"INTERRUPT": model.INTERRUPT}
- sim = Simulator(model)
- sim.setRealTime(True)
- sim.setRealTimeInputFile(None)
- sim.setRealTimePorts(refs)
- sim.setVerbose(None)
- sim.setRealTimePlatformThreads()
- sim.simulate()
- while 1:
- sim.realtime_interrupt(input('Enter some input:'))
- # INTERRUPT toManual
- # INTERRUPT toAutonomous
|