# # Constraints.py # ################# from Traffic_Car import * from Traffic_Constraint import * def constraintGreaterThanZero(constraint): if constraint.max_cars.getValue() <= 0: return ("Max cars of constraint cannot be less or equal to 0 !",constraint) else: return None def periodGreaterThanZero(trafficlight): if trafficlight.period_red.getValue() <= 0 or trafficlight.period_orange.getValue() <= 0 or trafficlight.period_green.getValue() <= 0: return ("Traffic light period cannot be less or equal to 0 !",trafficlight) else: return None def generatorNotLessThanZero(gen): if gen.cars_to_generate.getValue() < 0: return ("Generator cannot generate less than 0 cars!",gen) else: return None def getRoadFromCar(car): if len(car.out_connections_) is not 0: link = car.out_connections_[0] if len(link.out_connections_) is not 0: return link.out_connections_[0] return None def getConstraintsFromRoad(road): constraints = [] for link in road.in_connections_: for obj in link.in_connections_: if isinstance(obj, Traffic_Constraint): constraints = constraints + [obj] return constraints def checkMaxCars(constraint,carBeforeConnected=None): cars = 0 if carBeforeConnected: cars = 1 for link in constraint.out_connections_: for obj in link.out_connections_: for link2 in obj.in_connections_: for obj2 in link2.in_connections_: if isinstance(obj2, Traffic_Car) and obj2 is not carBeforeConnected: cars = cars+1 if cars > constraint.max_cars.getValue(): return ("Constraint not satisfied, too many cars!",constraint) else: return None def checkMaxCarsFromCar(car): road = getRoadFromCar(car) if road is None: return None else: return checkMaxCarsFromRoad(road) def checkMaxCarsFromRoad(road, carBeforeConnected=None): constraints = getConstraintsFromRoad(road) for constraint in constraints: result = checkMaxCars(constraint,carBeforeConnected) if result is not None: return result return None # # Actions.py # ############# import time import threading import random from ATOM3Enum import * from Traffic_MM import * from Constraints import checkMaxCarsFromRoad def moveCarToRoad(car): epsX = 15 epsY = 20 carGraphObj = car.graphObject_ link = car.out_connections_[0] road = link.out_connections_[0] roadGraphObj = road.graphObject_ deltaX = roadGraphObj.x - carGraphObj.x deltaY = roadGraphObj.y - carGraphObj.y carGraphObj.Move(deltaX+epsX, deltaY+epsY) def initMovePortsAndCars(road): roadGraphObj = road.graphObject_ roadGraphObj.oldx = roadGraphObj.x roadGraphObj.oldy = roadGraphObj.y def movePortsAndCars(road): roadGraphObj = road.graphObject_ deltaX = roadGraphObj.x - roadGraphObj.oldx deltaY = roadGraphObj.y - roadGraphObj.oldy for outc in road.out_connections_: outc.graphObject_.Move(deltaX, deltaY) for obj in outc.out_connections_: obj.graphObject_.Move(deltaX, deltaY) for inc in road.in_connections_: inc.graphObject_.Move(deltaX, deltaY) for obj in inc.in_connections_: if not isinstance(obj, Traffic_Constraint): obj.graphObject_.Move(deltaX, deltaY) for inc2 in obj.in_connections_: for obj2 in inc2.in_connections_: if isinstance(obj2, Traffic_TrafficLight): obj2.graphObject_.Move(deltaX, deltaY) inc2.graphObject_.Move(deltaX, deltaY) def getRoadFromCar(car): if len(car.out_connections_) is not 0: link = car.out_connections_[0] if len(link.out_connections_) is not 0: return link.out_connections_[0] return None def getTrafficLightFromRoad(road): for link in road.in_connections_: for obj in link.in_connections_: for link2 in obj.in_connections_: for obj2 in link2.in_connections_: if isinstance(obj2, Traffic_TrafficLight): return obj2 return None def resetTrafficLightTimer(trafficlight): [colors,selected] = trafficlight.color.getValue() if selected is 0: trafficlight.timer.setValue(trafficlight.period_red.getValue()) elif selected is 1: trafficlight.timer.setValue(trafficlight.period_orange.getValue()) elif selected is 2: trafficlight.timer.setValue(trafficlight.period_green.getValue()) def initAllTrafficLightTimers(atom3i): trafficlights = atom3i.ASGroot.listNodes['Traffic_TrafficLight'] for trafficlight in trafficlights: resetTrafficLightTimer(trafficlight) def initGenerator(gen): gen.remaining_cars.setValue(gen.cars_to_generate.getValue()) def initAllGenerators(atom3i): gens = atom3i.ASGroot.listNodes['Traffic_Generator'] for gen in gens: initGenerator(gen) def updateRoadCounter(car): road = getRoadFromCar(car) if road: road.cars.setValue(road.cars.getValue()+1) road.graphObject_.ModifyAttribute('cars', road.cars.getValue()) def getNextRoads(road): nextroads = [] for link in road.out_connections_: for outport in link.out_connections_: for link2 in outport.out_connections_: for obj in link2.out_connections_: if isinstance(obj, Traffic_Sink): nextroads = nextroads + [obj] for link3 in obj.out_connections_: for nextroad in link3.out_connections_: nextroads = nextroads + [nextroad] return nextroads def pickNextRoad(nextroads): nextroad = None #Two next roads found, the road is a splitter if len(nextroads) is 2: random.seed(None) rand = random.randint(0,1) nextroad = nextroads[rand] #One next road found, the road is a roadsegment or a merger elif len(nextroads) is 1: nextroad = nextroads[0] return nextroad def removeCar(car,atom3i): car.graphObject_.erase(atom3i) if len(car.out_connections_) is not 0: link = car.out_connections_[0] link.graphObject_.erase(atom3i) link.removeNode() def connectToNextRoad(car,nextroad,atom3i): #If the next road is a sink -> remove the car if isinstance(nextroad, Traffic_Sink): removeCar(car,atom3i) nextroad.cars_entered.setValue(nextroad.cars_entered.getValue()+1) nextroad.graphObject_.ModifyAttribute('cars_entered', nextroad.cars_entered.getValue()) else: if len(car.out_connections_) is not 0: link = car.out_connections_[0] link.graphObject_.erase(atom3i) link.removeNode() newlink = createNewTraffic_FromCar2Segment(atom3i, 0, 0) atom3i.drawConnections((newlink,nextroad,[0.0, 0.0, 0.0, 0.0],"true", 2)) atom3i.drawConnections((car,newlink,[0.0, 0.0, 0.0, 0.0],"true", 2)) def moveCar(car,road,atom3i): nextroads = getNextRoads(road) nextroad = pickNextRoad(nextroads) #There are no next roads -> remove the car if nextroad is None: removeCar(car,atom3i) return True #Check if the constraints will not be violated if the car should move if checkMaxCarsFromRoad(nextroad, car) is None: connectToNextRoad(car,nextroad,atom3i) road.cars.setValue(road.cars.getValue()-1) road.graphObject_.ModifyAttribute('cars', road.cars.getValue()) return True else: return False def trafficLightIsGreen(road): light = getTrafficLightFromRoad(road) if light: [colors,selected] = light.color.getValue() #Traffic light is green if selected is 2: return True #Traffic light is orange or red else: return False #There is no traffic light else: return True def moveCarIfGreen(car,atom3i): road = getRoadFromCar(car) #Car not connected with a road if road is None: return False #Only move if traffic light is green or if there is no light if trafficLightIsGreen(road): return moveCar(car,road,atom3i) #It's red or orange light else: nextroads = getNextRoads(road) nextroad = pickNextRoad(nextroads) #The last car can be waiting in front of a red or orange light and can move on when it's green if checkMaxCarsFromRoad(nextroad, car) is None: return True #If the light would be green the car could not proceed by a constraint violation else: return False def generateCar(gen, atom3i): if gen.remaining_cars.getValue() > 0: nextroad = None for link in gen.out_connections_: for inp in link.out_connections_: for link2 in inp.out_connections_: for obj in link2.out_connections_: nextroad = obj if nextroad: newcar = createNewTraffic_Car(atom3i, 0, 0) #Check if the constraints will not be violated if a car should be generated if checkMaxCarsFromRoad(nextroad, newcar) is None: connectToNextRoad(newcar,nextroad,atom3i) gen.remaining_cars.setValue(gen.remaining_cars.getValue()-1) gen.graphObject_.ModifyAttribute('remaining_cars', gen.remaining_cars.getValue()) return True else: removeCar(newcar,atom3i) return False else: return False else: return False def changeLight(trafficlight): timer = trafficlight.timer.getValue() trafficlight.timer.setValue(timer-1) if timer <= 0: [colors,selected] = trafficlight.color.getValue() newvalue = None if selected is 0: newvalue = 2 elif selected is 1: newvalue = 0 elif selected is 2: newvalue = 1 trafficlight.color.setValue((['red', 'orange', 'green'], newvalue)) resetTrafficLightTimer(trafficlight) trafficlight.graphObject_.ModifyAttribute('color', colors[newvalue]) def oneTimeSlot(atom3i): somethinghappened = False #All cars move 1 cars = atom3i.ASGroot.listNodes['Traffic_Car'] for car in cars: if moveCarIfGreen(car,atom3i): somethinghappened = True #All generators generate a car gens = atom3i.ASGroot.listNodes['Traffic_Generator'] for gen in gens: if generateCar(gen,atom3i): somethinghappened = True #All trafficlights change light (if timer expires) trafficlights = atom3i.ASGroot.listNodes['Traffic_TrafficLight'] for trafficlight in trafficlights: changeLight(trafficlight) return somethinghappened class simulator(threading.Thread): def __init__(self,atom3i): threading.Thread.__init__(self) self.atom3i = atom3i initAllTrafficLightTimers(atom3i) initAllGenerators(atom3i) def run(self): done = False while not done: time.sleep(1) done = not oneTimeSlot(self.atom3i) def simulate(atom3i): sim = simulator(atom3i) sim.start()