import sumolib from math import dist, ceil, sqrt, log, cos, sin, atan2, pi from random import randint, uniform from itertools import islice import time from globalImport import globalImport def pysideImports(): globalImport(globals(), "PySide6.QtGui", ["QPainter", "QColor"]) globalImport(globals(), "PySide6.QtCore", ["QPointF", "Signal", "QObject", "Qt"]) class updateSignals(QObject): updateDisp = Signal(tuple) addGraphPt = Signal(tuple) class Car(): CLIGNO_NONE = 0 CLIGNO_LEFT = 1 CLIGNO_RIGHT = 2 def getShape(self, edgeInd): startEdge = self.route[edgeInd] laneId = 0 lane = startEdge.getLane(laneId) vmax = lane.getSpeed() laneShape = lane.getShape() return (laneShape, vmax, laneId) def initPath(self): newLane = self.getShape(self.index) self.laneShape = newLane[0] self.vmax = newLane[1] self.laneId = newLane[2] if self.infoWidg is None: return self.signals.updateDisp.emit(("Index",f"{self.index}/{len(self.route)}")) self.signals.updateDisp.emit(("Edge",self.route[self.index])) #print(f"{self.id} : {startEdge.getID()} -> {nextEdge.getID()} via {laneId}") def __init__(self,carID,route,startTime,dynSpeed,IA,parentMap,parentController,infoWidg): self.id=carID self.map=parentMap self.controller=parentController self.infoWidg=infoWidg self.index=0 self.laneInd=0 self.route=[] self.laneShape=None self.laneId=0 self.leader=None self.leaderAtInter=None self.leaderDist=0 self.leaderAtInterDist=0 self.startTime=float(startTime) self.dynSpeed=(dynSpeed == '1') if isinstance(dynSpeed, str) else dynSpeed self.IA=(IA == '1') if isinstance(IA, str) else IA self.cligno=[] self.isLeader=0 self.leaderStopped=0 self.pos=[0,0] self.dir=0 self.v=0 self.a=10 self.b=20 self.minSpace=10 self.interMinSpace=10 self.distToInter=0 self.timeStopped=0 self.speedPercentage=0 self.ticksLived=0 self.timeAtInter = 0 self.forceThrough = False self.vmax=0 self.alpha = 0.1 self.beta = 0.1 self.nu = 0 self.gamma = 10 self.delta = 0 #self.T = uniform(0.9,1.6) if not self.IA else 0.01 self.T = uniform(0.9,1.6) if not self.IA else 0.01 self.size = 3 self.vroom = 0 self.rawRoute = route self.imported = False if infoWidg is None: return pysideImports() self.imported = True self.signals=updateSignals() self.signals.updateDisp.connect(self.infoWidg.setVal) self.signals.addGraphPt.connect(self.infoWidg.addSpeedPoint) self.signals.updateDisp.emit(("Position",self.pos)) self.signals.updateDisp.emit(("Vitesse",self.v)) self.signals.updateDisp.emit(("Index",f"{self.index}/{len(route)}")) def prepareRoute(self): route = list(map(self.map.getEdge,self.rawRoute)) if None in route: print("error planning route, mismatched net/demand?") return for r,rn in zip(route,route[1:]): self.route.append(r) conn=r.getConnections(rn) if(len(conn)==0): continue if conn[0].getDirection() == "l": self.cligno.append(self.CLIGNO_LEFT) elif conn[0].getDirection() == "r": self.cligno.append(self.CLIGNO_RIGHT) else: self.cligno.append(self.CLIGNO_NONE) edge=self.map.getLane(conn[0].getViaLaneID()).getEdge() self.route.append(edge) self.cligno.append(self.CLIGNO_NONE) secEdge=edge.getConnections(rn)[0].getViaLaneID() # Parfois je sais pas pourquoi il coupe les edges internes, mais il marque quand même la connection, ducoup pour contourner while secEdge!="": edge=self.map.getLane(secEdge).getEdge() self.route.append(edge) self.cligno.append(self.CLIGNO_NONE) secEdge=edge.getConnections(rn)[0].getViaLaneID() self.initPath() self.pos=list(self.laneShape[0]) def getLeader(self, maxDist): shapeInd = self.laneInd edgeInd = self.index prevInd = edgeInd laneShape = self.laneShape laneId = self.laneId l = 0 carsHere = self.controller.getCarsOnLane(self.route[edgeInd].getID(), laneId) carsHere = list(filter(lambda c: c.id != self.id, carsHere)) while(l=len(laneShape)-1): shapeInd = 0 edgeInd+=1 if(not self.route[edgeInd].isSpecial()): prevInd = edgeInd carsHere = self.controller.getCarsOnLane(self.route[edgeInd].getID(), laneId) carsHere = list(filter(lambda c: c.id != self.id, carsHere)) # me demande pas pourquoi mais si on le convertit pas en liste ici le filter original est modifié if(edgeInd>=len(self.route)-1): return newLane = self.getShape(edgeInd) laneShape = newLane[0] laneId = newLane[2] else: if l == 0: l+=dist(self.pos, closest.pos) else: l+=dist(laneShape[shapeInd], closest.pos) if l <= maxDist: self.leaderDist=l return closest else: return return def getCurrentEdge(self): return self.route[self.index] def getCarDist(self, car, edge, laneInd, startFromEnd = True): lanes = edge.getLane(laneInd).getShape().copy() if startFromEnd: lanes.reverse() cDist = 0 for i,l in enumerate(lanes[:-1]): if car.laneInd != (i if not startFromEnd else len(lanes)-i-2): cDist += dist(l, lanes[i+1]) else: cDist += dist(l, car.pos) return cDist return cDist def getDistToEndOfEdge(self): lgt = 0 lgt += dist(self.pos, self.laneShape[self.laneInd+1]) for p,n in zip(self.laneShape[self.laneInd+1:], self.laneShape[self.laneInd+2:]): lgt += dist(p,n) return lgt def getLeaderAtIntersections(self, maxDist): l = self.getDistToEndOfEdge() edgeInd = self.index + 1 while l < maxDist: carComing = self.getLeaderAtIntersection(edgeInd - 1,edgeInd) if carComing is not None: self.distToInter = l self.leaderAtInterDist = carComing[0] return carComing[1] l += self.route[edgeInd].getLength() edgeInd += 1 if edgeInd >= len(self.route): return return def getLeaderAtIntersection(self, prevInd, edgeInd): # On récupère les edges juste avant et juste après l'intersection while(self.route[edgeInd].isSpecial()): return edgeInd = edgeInd + 1 if edgeInd >= len(self.route): print(self.id,"fu") return None while self.route[prevInd].isSpecial(): prevInd -= 1 if prevInd < 0: print(self.id, "fu2") return None # On récupère les connections entre l'edge précedent et l'actuel, et on garde que la première, si y en as pas on panique inter = self.route[edgeInd-1].getFromNode() connections = self.route[prevInd].getConnections(self.route[edgeInd]) if(len(connections)==0): print("pas de connections") return None connection = connections[0] # On récupère la matrice de priorité pour notre route linkInd = inter.getLinkIndex(connection) if(linkInd == -1): # Ca devrait pas arriver, mais de toute évidence ça arrive print(self.id, "fu3") return; resp = inter._prohibits[linkInd] # Si je me souvient bien les variables précédées d'un _ doivent pas être touchées? # On se fait une map de correspondance entre indice de link et connection connRaw = inter.getConnections() conn = [0] * len(resp) for c in connRaw: ind = inter.getLinkIndex(c) if(ind == -1): continue conn[ind] = c # Pour chaque connection si on est pas prio on regarde si y as des voitures et comme d'hab on prend la plus proche cars = [] for i,f in enumerate(reversed(resp)): if(f == '0'): continue edge = conn[i].getFrom() laneInd = conn[i].getFromLane().getIndex() intLane = self.map.getLane(conn[i].getViaLaneID()) intLaneLgt = intLane.getLength() carsInEdge = self.controller.getCarsOnLane(edge.getID(), laneInd) carsInEdge = list(filter(lambda c: c.nextNonSpecialEdge() == conn[i].getTo(), carsInEdge)) if len(carsInEdge) != 0: carsInEdge = zip([self.getCarDist(c, edge, laneInd)+intLaneLgt for c in carsInEdge], carsInEdge) closest = min(carsInEdge, key=lambda c: c[0]) cars.append(closest) intEdge = intLane.getEdge() carsInEdge = list(self.controller.getCarsOnLane(intEdge.getID(), intLane.getIndex())) if len(carsInEdge) != 0: carsInEdge = zip([self.getCarDist(c, intEdge, intLane.getIndex()) for c in carsInEdge], carsInEdge) closest = min(carsInEdge, key=lambda c: c[0]) cars.append(closest) # On cherche aussi dans les edges du node precedent et les edge d'avant prevLanesLgt = intLaneLgt + conn[i].getFromLane().getLength() prevNode = edge.getFromNode() for intLaneID in prevNode.getInternal(): intLane = self.map.getLane(intLaneID) intEdge = intLane.getEdge() carsInEdge = list(self.controller.getCarsOnLane(intEdge.getID(), intLane.getIndex())) if len(carsInEdge) != 0: carsInEdge = zip([self.getCarDist(c, intEdge, intLane.getIndex())+prevLanesLgt for c in carsInEdge], carsInEdge) closest = min(carsInEdge, key=lambda c: c[0]) cl = closest[1] if cl.nextNonSpecialEdge() == edge and cl.nextNonSpecialEdge(3) == conn[i].getTo(): #TODO: le 10 je l'ai sorti de mon cars.append(closest) for incEdge in prevNode.getIncoming(): if incEdge.isSpecial(): continue incLane = incEdge.getLane(0) intConn = incEdge.getConnections(edge) intLaneLgt = 0 if len(intConn) != 0: intLaneLgt = self.map.getLane(intConn[0].getViaLaneID()).getLength() carsInEdge = list(self.controller.getCarsOnEdge(incEdge.getID())) #carsInEdge = list(filter(lambda c: c.nextNonSpecialEdge() == edge and c.nextNonSpecialEdge(3) == conn[i].getTo() and (c.leader is None or c.leader.getCurrentEdge().getID() != c.getCurrentEdge().getID()), carsInEdge)) if len(carsInEdge) != 0: carsInEdge = zip([self.getCarDist(c, incEdge, incLane.getIndex())+prevLanesLgt+intLaneLgt for c in carsInEdge], carsInEdge) closest = min(carsInEdge, key=lambda c: c[0]) cl = closest[1] if cl.nextNonSpecialEdge() == edge and cl.nextNonSpecialEdge(3) == conn[i].getTo(): #TODO: comme avant (d'ailleurs faudrait les rassembler) cars.append(closest) if(len(cars) == 0): return None cDist,closest = min(cars, key=lambda c: c[0]) return (cDist,closest) def turnNext(self): return self.cligno[self.index] != self.CLIGNO_NONE def nextNonSpecialEdge(self, startOffset = 1): res = None try: res = next(filter(lambda e: not e.isSpecial(), islice(self.route, self.index + startOffset, None))) except StopIteration: return else: return res def draw(self,painter, colorOverride): if not self.imported: pysideImports() self.imported = True pt = QPointF(*self.pos) if colorOverride: pass elif self.forceThrough: painter.setPen(Qt.blue) elif self.isLeader > 0: painter.setPen(QColor(255,0,255)) elif self.leader is None: painter.setPen(Qt.gray) painter.drawEllipse(pt,self.size,self.size) painter.drawLine(self.pos[0], self.pos[1], self.pos[0]+5*cos(self.dir), self.pos[1]+5*sin(self.dir)) if len(self.cligno) != 0 and self.cligno[self.index] != self.CLIGNO_NONE: painter.save() painter.setPen(Qt.yellow) d = self.dir + pi/2 if self.cligno[self.index] == self.CLIGNO_RIGHT: d += pi painter.drawEllipse(pt + self.size * QPointF(cos(d),sin(d)), 3, 3) painter.restore() if self.vroom != 0: painter.save() d=(60-self.vroom)*0.2 painter.translate(pt + QPointF(0,d)) painter.scale(1,-1) font = painter.font(); font.setPixelSize(ceil(self.vroom*0.2)); painter.setFont(font) painter.drawText(QPointF(0,0),"vroom") self.vroom -= 1 painter.restore() #elSize = self.v * self.T #painter.drawEllipse(pt,elSize, elSize) def calcTti(self, dist, v0, vmax, a): ttms = (vmax - v0)/a # "time to max speed", temps pris par la voiture pour atteindre la vmax à partir de sa vitesse actuelle delta = v0**2 + 2*a*dist # delta du trinome qui donne le temps mis pour traverser la distance dist tti = (-v0 + sqrt(delta))/a # "time to intersection", temps mis pour arriver à l'intersection(dist), si il n'y a pas de vmax sai = v0 + tti*a # "speed at intersection", vitesse qu'aura la voiture quand elle arrivera if tti > ttms: # si on atteint vmax avant l'intersection alors ça foire le calcul dbvm = a/2 * ttms**2 + v0 * ttms # "distance before vmax", distance parcouru avant d'atteindre vmax tti = ttms + (dist - dbvm) / vmax # la temps necessaire est donc : temps pour atteindre vmax + temps pour traverser le reste à la vitesse vmax sai = vmax return (tti, sai) def conduiteKrauss(self, vmax, leader, leaderAtInter, dt): """if self.id == "f_00" and self.controller.t%10>5: self.v = 0 return if self.id == "v_0" and self.controller.t > 5: self.v = 0 self.updateGraph(self.leaderDist, self.distToInter, 0) return """ if False and ("f_0" in self.id or "f_1" in self.id or "f_2" in self.id): self.v = 0 return if leaderAtInter is None: self.forceThrough = False if leader is None and leaderAtInter is None: vd = min(self.v + self.a * dt, vmax) self.v = max(0, vd-self.nu) return vsecInter = vmax # si on est à une intersection if(leaderAtInter is not None): if self.v == 0: self.timeAtInter += dt else: self.timeAtInter = 0 vleader = leaderAtInter.v # on calcule le temps qu'on va mettre à arriver à l'intersection # et le temps que le leader va mettre lvmax = leaderAtInter.vmax if leaderAtInter.getCurrentEdge().isSpecial(): lvmax = leaderAtInter.nextNonSpecialEdge().getSpeed() nextInternalIndex = self.index # Pour la voiture actuelle, dans l'ideal on calculerait la durée selon la vitesse sur chaque troncon while not self.route[nextInternalIndex].isSpecial(): # Mais pour l'instant on prend juste la vitesse sur le troncon interne (le plus lent en general) nextInternalIndex += 1 tti, sai = self.calcTti(self.distToInter, self.v, self.route[nextInternalIndex].getSpeed(), self.a) ltti, lsai = self.calcTti(self.leaderAtInterDist, vleader, lvmax, leaderAtInter.a) lta = (lvmax-vleader) / leaderAtInter.a # temps ou le leader accelere (i.e on ne gagne pas de vitesse relative) (on considere que leader.a==self.a) marg = lta + (lsai-sai) / self.a # marge à prendre pour accelerer après l'intersection sans que le leader nous rattrape tts = self.v/self.b # time to stop, temps pour s'arreter si on freine mnt dts = self.v*tts - (self.b*tts**2)/2 # distance to stop, distance parcouru en tts si on freine #print(self.distToInter, self.minSpace, dts) # Si on est bloqué dans une dépendance circulaire maxInterTime = 0 if self.leader is None and self.timeAtInter >= 0: cd, maxInterTime = self.circularLeaderDep(self.leaderAtInter, 0, 20) if cd and self.timeAtInter >= maxInterTime: self.forceThrough = True # Ou si notre leader est bloqué devant un autre leader if self.leader is None and self.leaderAtInter.v == 0 and self.leaderAtInter.leaderAtInter is not None and self.leaderAtInter.leaderAtInterDist > self.interMinSpace and maxInterTime != -1: self.forceThrough = True # si on est suffisement loin de l'intersection (i.e on s'en fout du leader) # ou si on as le temps d'arriver à l'intersection avant le leader (plus un marge pour garder un distance de sécu) # alors on accelere pour s'inserer #print(tti, leader.T, marg, ltti) if self.distToInter > self.route[nextInternalIndex].getLength() + self.minSpace + dts or (tti + leaderAtInter.T + 0.1 + marg) < ltti: vsecInter = min(vmax, self.v + self.a*dt) #print(self.id, "ca passe") else:# sinon on freine vsecInter = max(0, self.v - self.b*dt) if self.forceThrough: vsecInter = min(vmax, self.v + self.a*dt) vsec = vmax if leader is not None: vleader = leader.v bleader = leader.b vb = (vleader + self.v) / 2 bb = (bleader + self.b) / 2 vsec = vleader + (self.leaderDist - vleader * self.T - self.minSpace)/((vb/bb) + self.T) vd = min(self.v + self.a * dt, vsec, vmax, vsecInter) self.v = max(0, vd-self.nu) self.updateGraph(self.v, vmax, vsec, vsecInter) # fonction pour verifier si on as pas une dependence circulaire de leader def circularLeaderDep(self, car, maxTimeStopped, timeout): if timeout <= 0: return False, 0 if car is None: return False, 0 if car.forceThrough: return False, -1 maxTimeStopped = max(maxTimeStopped, car.timeAtInter) if car.id == self.id: return True, maxTimeStopped timeout -= 1 res,mts = self.circularLeaderDep(car.leader, maxTimeStopped, timeout) res2,mts2 = self.circularLeaderDep(car.leaderAtInter, maxTimeStopped, timeout) maxTimeStopped = max(mts, mts2) if min(mts, mts2) == -1: return False, -1 return (res or res2), maxTimeStopped def updateGraph(self, v, vmax, vsec, interVsec): if self.infoWidg is None: return self.signals.addGraphPt.emit((2,self.controller.t,v)) self.signals.addGraphPt.emit((0,self.controller.t,vmax)) self.signals.addGraphPt.emit((1,self.controller.t,vsec)) self.signals.addGraphPt.emit((3,self.controller.t,interVsec)) def update(self,dt): if self.controller.t < self.startTime: return """ if self.v < 1 and self.getCurrentEdge().isSpecial(): print(f"{self.id} stalled where he souldn't have") """ self.leader=self.getLeader(100) self.leaderAtInter=self.getLeaderAtIntersections(100) vmax = self.vmax if self.dynSpeed: vmax = max(min(self.vmax * self.controller.dynSpeedRat, self.vmax), 8) self.conduiteKrauss(vmax,self.leader,self.leaderAtInter,dt) if self.v == 0: self.timeStopped += dt self.speedPercentage += self.v/self.vmax self.ticksLived += 1 lgt=self.v*dt self.dir = atan2(self.laneShape[self.laneInd+1][1]-self.pos[1],self.laneShape[self.laneInd+1][0]-self.pos[0]) while(lgt>0): endPos=self.laneShape[self.laneInd+1] l=dist(self.pos,endPos) if lgt>=l: lgt-=l pos=list(self.laneShape[-1]) self.laneInd+=1 if(self.laneInd>=len(self.laneShape)-1): self.laneInd=0 self.index+=1 if(self.index>=len(self.route)-1): self.index=0 self.controller.destroyCar(self) self.initPath() self.pos=list(self.laneShape[self.laneInd]) continue adv=lgt/l self.pos[0]+=(endPos[0]-self.pos[0])*adv self.pos[1]+=(endPos[1]-self.pos[1])*adv lgt=0 if self.controller.vroomEnable and randint(0,100) == 0: self.vroom = 60 if self.infoWidg is None: return self.signals.updateDisp.emit(("Position", self.pos)) self.signals.updateDisp.emit(("Vitesse", self.v)) self.signals.updateDisp.emit(("Leader", f"{self.leader.id if self.leader is not None else 'None'} @ {self.leaderDist:.2f}m : {self.leaderAtInter.id if self.leaderAtInter is not None else 'None'} @ {self.leaderAtInterDist:.2f})")) def __copy__(self): copy = Car(self.id, self.rawRoute, self.startTime, self.dynSpeed, self.IA, self.map, self.controller, self.infoWidg) copy.route = self.route copy.cligno = self.cligno copy.pos = self.pos.copy() copy.laneShape = self.laneShape.copy() copy.laneId = self.laneId copy.vmax = self.vmax return copy