import sumolib from math import dist, ceil, sqrt, log, cos, sin, atan2, pi from random import randint, uniform from PySide6.QtGui import QPainter, QColor from PySide6.QtCore import QPointF, Signal, QObject, Qt from itertools import islice import time class updateSignals(QObject): updateDisp = Signal(tuple) addGraphPt = Signal(tuple) class Car(): CLIGNO_NONE = 0 CLIGNO_LEFT = 1 CLIGNO_RIGHT = 2 def getShape(self, edgeInd): startEdge = self.route[edgeInd] laneId = 0 lane = startEdge.getLane(laneId) vmax = lane.getSpeed() laneShape = lane.getShape() return (laneShape, vmax, laneId) def initPath(self): newLane = self.getShape(self.index) self.laneShape = newLane[0] self.vmax = newLane[1] self.laneId = newLane[2] if self.infoWidg is None: return self.signals.updateDisp.emit(("Index",f"{self.index}/{len(self.route)}")) self.signals.updateDisp.emit(("Edge",self.route[self.index])) #print(f"{self.id} : {startEdge.getID()} -> {nextEdge.getID()} via {laneId}") def __init__(self,carID,route,startTime,dynSpeed,IA,parentMap,parentController,infoWidg): self.id=carID self.map=parentMap self.controller=parentController self.infoWidg=infoWidg self.index=0 self.laneInd=0 self.route=[] self.laneShape=None self.laneId=0 self.leader=None self.leaderAtInter=None self.leaderDist=0 self.leaderAtInterDist=0 self.startTime=float(startTime) self.dynSpeed=(dynSpeed == '1') if isinstance(dynSpeed, str) else dynSpeed self.IA=(IA == '1') if isinstance(IA, str) else IA self.cligno=[] self.isLeader=0 self.leaderStopped=0 self.pos=[0,0] self.dir=0 self.v=0 self.a=10 self.b=20 self.minSpace=10 self.interMinSpace=20 self.distToInter=0 self.timeStopped=0 self.speedPercentage=0 self.ticksLived=0 self.timeAtInter = 0 self.forceThrough = False self.vmax=0 self.alpha = 0.1 self.beta = 0.1 self.nu = 0 self.gamma = 10 self.delta = 0 #self.T = uniform(0.9,1.6) if not self.IA else 0.01 self.T = uniform(0.9,1.6) if not self.IA else 0.01 self.size = 3 self.vroom = 0 self.rawRoute = route if infoWidg is None: return self.signals=updateSignals() self.signals.updateDisp.connect(self.infoWidg.setVal) self.signals.addGraphPt.connect(self.infoWidg.addSpeedPoint) self.signals.updateDisp.emit(("Position",self.pos)) self.signals.updateDisp.emit(("Vitesse",self.v)) self.signals.updateDisp.emit(("Index",f"{self.index}/{len(route)}")) def prepareRoute(self): route = list(map(self.map.getEdge,self.rawRoute)) if None in route: print("error planning route, mismatched net/demand?") return for r,rn in zip(route,route[1:]): self.route.append(r) conn=r.getConnections(rn) if(len(conn)==0): continue if conn[0].getDirection() == "l": self.cligno.append(self.CLIGNO_LEFT) elif conn[0].getDirection() == "r": self.cligno.append(self.CLIGNO_RIGHT) else: self.cligno.append(self.CLIGNO_NONE) edge=self.map.getLane(conn[0].getViaLaneID()).getEdge() self.route.append(edge) self.cligno.append(self.CLIGNO_NONE) secEdge=edge.getConnections(rn)[0].getViaLaneID() # Parfois je sais pas pourquoi il coupe les edges internes, mais il marque quand même la connection, ducoup pour contourner while secEdge!="": edge=self.map.getLane(secEdge).getEdge() self.route.append(edge) self.cligno.append(self.CLIGNO_NONE) secEdge=edge.getConnections(rn)[0].getViaLaneID() self.initPath() self.pos=list(self.laneShape[0]) def getLeader(self, maxDist): shapeInd = self.laneInd edgeInd = self.index prevInd = edgeInd laneShape = self.laneShape laneId = self.laneId l = 0 carsHere = self.controller.getCarsOnLane(self.route[edgeInd].getID(), laneId) carsHere = list(filter(lambda c: c.id != self.id, carsHere)) while(l=len(laneShape)-1): shapeInd = 0 edgeInd+=1 if(not self.route[edgeInd].isSpecial()): prevInd = edgeInd carsHere = self.controller.getCarsOnLane(self.route[edgeInd].getID(), laneId) carsHere = list(filter(lambda c: c.id != self.id, carsHere)) # me demande pas pourquoi mais si on le convertit pas en liste ici le filter original est modifié if(edgeInd>=len(self.route)-1): return newLane = self.getShape(edgeInd) laneShape = newLane[0] laneId = newLane[2] else: if l == 0: l+=dist(self.pos, closest.pos) else: l+=dist(laneShape[shapeInd], closest.pos) if l <= maxDist: self.leaderDist=l return closest else: return return def getCurrentEdge(self): return self.route[self.index] def getCarDist(self, car, edge, laneInd, startFromEnd = True): lanes = edge.getLane(laneInd).getShape().copy() if startFromEnd: lanes.reverse() cDist = 0 for i,l in enumerate(lanes[:-1]): if car.laneInd != (i if not startFromEnd else len(lanes)-i-2): cDist += dist(l, lanes[i+1]) else: cDist += dist(l, car.pos) return cDist return cDist def getDistToEndOfEdge(self): lgt = 0 lgt += dist(self.pos, self.laneShape[self.laneInd+1]) for p,n in zip(self.laneShape[self.laneInd+1:], self.laneShape[self.laneInd+2:]): lgt += dist(p,n) return lgt def getLeaderAtIntersections(self, maxDist): l = self.getDistToEndOfEdge() edgeInd = self.index + 1 while l < maxDist: carComing = self.getLeaderAtIntersection(edgeInd - 1,edgeInd) l += self.route[edgeInd].getLength() if carComing is not None: self.distToInter = l self.leaderAtInterDist = carComing[0] return carComing[1] edgeInd += 1 if edgeInd >= len(self.route): return return def getLeaderAtIntersection(self, prevInd, edgeInd): # On récupère les edges juste avant et juste après l'intersection while(self.route[edgeInd].isSpecial()): return edgeInd = edgeInd + 1 if edgeInd >= len(self.route): print(self.id,"fu") return None while self.route[prevInd].isSpecial(): prevInd -= 1 if prevInd < 0: print(self.id, "fu2") return None # On récupère les connections entre l'edge précedent et l'actuel, et on garde que la première, si y en as pas on panique inter = self.route[edgeInd-1].getFromNode() connections = self.route[prevInd].getConnections(self.route[edgeInd]) if(len(connections)==0): print("pas de connections") return None connection = connections[0] # On récupère la matrice de priorité pour notre route linkInd = inter.getLinkIndex(connection) if(linkInd == -1): # Ca devrait pas arriver, mais de toute évidence ça arrive print(self.id, "fu3") return; resp = inter._prohibits[linkInd] # Si je me souvient bien les variables précédées d'un _ doivent pas être touchées? # On se fait une map de correspondance entre indice de link et connection connRaw = inter.getConnections() conn = [0] * len(resp) for c in connRaw: ind = inter.getLinkIndex(c) if(ind == -1): continue conn[ind] = c # Pour chaque connection si on est pas prio on regarde si y as des voitures et comme d'hab on prend la plus proche cars = [] for i,f in enumerate(reversed(resp)): if(f == '0'): continue edge = conn[i].getFrom() laneInd = conn[i].getFromLane().getIndex() intLane = self.map.getLane(conn[i].getViaLaneID()) intLaneLgt = intLane.getLength() carsInEdge = self.controller.getCarsOnLane(edge.getID(), laneInd) carsInEdge = list(filter(lambda c: c.nextNonSpecialEdge() == conn[i].getTo(), carsInEdge)) if len(carsInEdge) != 0: carsInEdge = zip([self.getCarDist(c, edge, laneInd)+intLaneLgt for c in carsInEdge], carsInEdge) closest = min(carsInEdge, key=lambda c: c[0]) cars.append(closest) intEdge = intLane.getEdge() carsInEdge = list(self.controller.getCarsOnLane(intEdge.getID(), intLane.getIndex())) if len(carsInEdge) != 0: carsInEdge = zip([self.getCarDist(c, intEdge, intLane.getIndex()) for c in carsInEdge], carsInEdge) closest = min(carsInEdge, key=lambda c: c[0]) cars.append(closest) # On cherche aussi dans les edges du node precedent et les edge d'avant prevLanesLgt = intLaneLgt + conn[i].getFromLane().getLength() prevNode = edge.getFromNode() for intLaneID in prevNode.getInternal(): intLane = self.map.getLane(intLaneID) intEdge = intLane.getEdge() carsInEdge = list(self.controller.getCarsOnLane(intEdge.getID(), intLane.getIndex())) if len(carsInEdge) != 0: carsInEdge = zip([self.getCarDist(c, intEdge, intLane.getIndex())+prevLanesLgt for c in carsInEdge], carsInEdge) closest = min(carsInEdge, key=lambda c: c[0]) cl = closest[1] if cl.nextNonSpecialEdge() == edge and cl.nextNonSpecialEdge(3) == conn[i].getTo(): #TODO: le 10 je l'ai sorti de mon cars.append(closest) for incEdge in prevNode.getIncoming(): if incEdge.isSpecial(): continue incLane = incEdge.getLane(0) intConn = incEdge.getConnections(edge) intLaneLgt = 0 if len(intConn) != 0: intLaneLgt = self.map.getLane(intConn[0].getViaLaneID()).getLength() carsInEdge = list(self.controller.getCarsOnEdge(incEdge.getID())) #carsInEdge = list(filter(lambda c: c.nextNonSpecialEdge() == edge and c.nextNonSpecialEdge(3) == conn[i].getTo() and (c.leader is None or c.leader.getCurrentEdge().getID() != c.getCurrentEdge().getID()), carsInEdge)) if len(carsInEdge) != 0: carsInEdge = zip([self.getCarDist(c, incEdge, incLane.getIndex())+prevLanesLgt+intLaneLgt for c in carsInEdge], carsInEdge) closest = min(carsInEdge, key=lambda c: c[0]) cl = closest[1] if cl.nextNonSpecialEdge() == edge and cl.nextNonSpecialEdge(3) == conn[i].getTo(): #TODO: comme avant (d'ailleurs faudrait les rassembler) cars.append(closest) if(len(cars) == 0): return None cDist,closest = min(cars, key=lambda c: c[0]) return (cDist,closest) def turnNext(self): return self.cligno[self.index] != self.CLIGNO_NONE def nextNonSpecialEdge(self, startOffset = 1): return next(filter(lambda e: not e.isSpecial(), islice(self.route, self.index + startOffset, None))) def draw(self,painter, colorOverride): pt = QPointF(*self.pos) if colorOverride: pass elif self.forceThrough: painter.setPen(Qt.blue) elif self.isLeader > 0: painter.setPen(QColor(255,0,255)) elif self.leader is None: painter.setPen(Qt.gray) painter.drawEllipse(pt,self.size,self.size) painter.drawLine(self.pos[0], self.pos[1], self.pos[0]+5*cos(self.dir), self.pos[1]+5*sin(self.dir)) if len(self.cligno) != 0 and self.cligno[self.index] != self.CLIGNO_NONE: painter.save() painter.setPen(Qt.yellow) d = self.dir + pi/2 if self.cligno[self.index] == self.CLIGNO_RIGHT: d += pi painter.drawEllipse(pt + self.size * QPointF(cos(d),sin(d)), 3, 3) painter.restore() if self.vroom != 0: painter.save() d=(60-self.vroom)*0.2 painter.translate(pt + QPointF(0,d)) painter.scale(1,-1) font = painter.font(); font.setPixelSize(ceil(self.vroom*0.2)); painter.setFont(font) painter.drawText(QPointF(0,0),"vroom") self.vroom -= 1 painter.restore() #elSize = self.v * self.T #painter.drawEllipse(pt,elSize, elSize) def conduite(self,vmax,leader,dt): """if self.id == "f_00" and self.controller.t%10>5: self.v = 0 return """ if(leader is None): self.v = vmax self.updateGraph(self.v, vmax, 0) return vleader=50#self.v bleader=self.b else: vleader=leader.v # vitesse de la voiture leader bleader=leader.b if(self.leaderBefore): if(vleader == 0): return if(self.distToInter > (self.T * vmax) or ((self.distToInter / vmax) < (self.leaderDist / vleader) - 2 * self.T)): self.v=vmax else: self.v = 0 self.updateGraph(self.v, vmax, 0) return vbar=(self.v+vleader)/2 bbar=(bleader+self.b)/2 # S = vleader * 3.6 * 0.6 Si=self.leaderDist-vleader*self.T #S=vf**2 / self.b + vleader**2 / bleader + self.gamma * vf + self.delta T=self.T vsec=vleader+(Si-vmax*T)/(vbar/bbar+T) vd=min(self.v+self.a*dt,vmax,vsec) #vf=min(va,vb2) #va=self.v+2.5*self.a*self.T*(1-(self.v/vd))*sqrt(0.025+(self.v/vd)) #vb2=self.b*self.T+sqrt((self.b)**2*(self.T)**2-self.b*(2*(S-Si)-self.v*self.T-((vleader)**2/bbar))) self.v=max(0,vd) self.updateGraph(self.v, vmax, vsec) def conduiteGipps(self, dt): #Nope Va = self.v + 2.5 * self.a * dt * (1 - self.v/self.vmax) * sqrt(0.025 + self.v/self.vmax) #Vb = self.b * dt + sqrt(self.b**2 * dt**2 - self.b * ) def calcTti(self, dist, v0, vmax, a): ttms = (vmax - v0)/a # "time to max speed", temps pris par la voiture pour atteindre la vmax à partir de sa vitesse actuelle delta = v0**2 + 2*a*dist # delta du trinome qui donne le temps mis pour traverser la distance dist tti = (-v0 + sqrt(delta))/a # "time to intersection", temps mis pour arriver à l'intersection(dist), si il n'y a pas de vmax sai = v0 + tti*a # "speed at intersection", vitesse qu'aura la voiture quand elle arrivera if tti > ttms: # si on atteint vmax avant l'intersection alors ça foire le calcul dbvm = a/2 * ttms**2 + v0 * ttms # "distance before vmax", distance parcouru avant d'atteindre vmax tti = ttms + (dist - dbvm) / vmax # la temps necessaire est donc : temps pour atteindre vmax + temps pour traverser le reste à la vitesse vmax sai = vmax return (tti, sai) def conduiteKrauss(self, vmax, leader, dt): """if self.id == "f_00" and self.controller.t%10>5: self.v = 0 return if self.id == "v_0" and self.controller.t > 5: self.v = 0 self.updateGraph(self.leaderDist, self.distToInter, 0) return """ if False and ("f_0" in self.id or "f_1" in self.id or "f_2" in self.id): self.v = 0 return if self.leaderAtInter is None: self.forceThrough = False if leader is None: vd = min(self.v + self.a * dt, vmax) self.v = max(0, vd-self.nu) return vleader = leader.v bleader = leader.b # si on est à une intersection if(self.leaderAtInter is not None): if self.v == 0: self.timeAtInter += dt else: self.timeAtInter = 0 # on calcule le temps qu'on va mettre à arriver à l'intersection # et le temps que le leader va mettre lvmax = leader.vmax if leader.getCurrentEdge().isSpecial(): lvmax = leader.nextNonSpecialEdge().getSpeed() nextInternalIndex = self.index # Pour la voiture actuelle, dans l'ideal on calculerait la durée selon la vitesse sur chaque troncon while not self.route[nextInternalIndex].isSpecial(): # Mais pour l'instant on prend juste la vitesse sur le troncon interne (le plus lent en general) nextInternalIndex += 1 tti, sai = self.calcTti(self.distToInter, self.v, self.route[nextInternalIndex].getLane(0).getSpeed(), self.a) # TODO : laneID ltti, lsai = self.calcTti(self.leaderDist, vleader, lvmax, leader.a) lta = (lvmax-vleader) / leader.a # temps ou le leader accelere (i.e on ne gagne pas de vitesse relative) (on considere que leader.a==self.a) marg = lta + (lsai-sai) / self.a # marge à prendre pour accelerer après l'intersection sans que le leader nous rattrape tts = self.v/self.b # time to stop, temps pour s'arreter si on freine mnt dts = self.v*tts - (self.b*tts**2)/2 # distance to stop, distance parcouru en tts si on freine #print(self.distToInter, self.minSpace, dts) # Si on est bloqué dans une dépendance circulaire if self.timeAtInter >= 0 and self.circularLeaderDep(): self.forceThrough = True if self.forceThrough: self.leader = None # On supprime le leader (pour que seulement la premiere voiture detecte la dependance circulaire) self.v = min(vmax, self.v + self.a*dt) return if False and self.leaderStopped > 1: self.v = min(vmax, self.v + self.a*dt) return # si on est suffisement loin de l'intersection (i.e on s'en fout du leader) # ou si on as le temps d'arriver à l'intersection avant le leader (plus un marge pour garder un distance de sécu) # alors on accelere pour s'inserer #print(tti, leader.T, marg, ltti) if self.distToInter > self.interMinSpace + dts or (tti + leader.T + marg) < ltti: self.v = min(vmax, self.v + self.a*dt) #print(self.id, "ca passe") else:# sinon on freine self.v = max(0, self.v - self.b*dt) self.updateGraph(self.v, vmax, self.leaderDist) return vb = (vleader + self.v) / 2 bb = (bleader + self.b) / 2 vsec = vleader + (self.leaderDist - vleader * self.T - self.minSpace)/((vb/bb) + self.T) vd = min(self.v + self.a * dt, vsec, vmax) self.v = max(0, vd-self.nu) self.updateGraph(self.v, vmax, vsec) # fonction pour verifier si on as pas une dependence circulaire de leader def circularLeaderDep(self): l = self.leader timeout = 4 ls = [] while l is not None and timeout > 0: ls.append(l.id) if l.id == self.id: #print(ls) return True timeout -= 1 l = l.leader return False def updateGraph(self, v, vmax, vsec): if self.infoWidg is None: return self.signals.addGraphPt.emit((2,self.controller.t,v)) self.signals.addGraphPt.emit((0,self.controller.t,vmax)) self.signals.addGraphPt.emit((1,self.controller.t,vsec)) def update(self,dt): if self.controller.t < self.startTime: return """ if self.v < 1 and self.getCurrentEdge().isSpecial(): print(f"{self.id} stalled where he souldn't have") """ self.leader=self.getLeader(100) self.leaderAtInter=self.getLeaderAtIntersections(100) vmax = self.vmax if self.dynSpeed: vmax = max(min(self.vmax * self.controller.dynSpeedRat, self.vmax), 8) self.conduiteKrauss(vmax,self.leader,dt) if self.v == 0: self.timeStopped += dt self.speedPercentage += self.v/self.vmax self.ticksLived += 1 lgt=self.v*dt self.dir = atan2(self.laneShape[self.laneInd+1][1]-self.pos[1],self.laneShape[self.laneInd+1][0]-self.pos[0]) while(lgt>0): endPos=self.laneShape[self.laneInd+1] l=dist(self.pos,endPos) if lgt>=l: lgt-=l pos=list(self.laneShape[-1]) self.laneInd+=1 if(self.laneInd>=len(self.laneShape)-1): self.laneInd=0 self.index+=1 if(self.index>=len(self.route)-1): self.index=0 self.controller.destroyCar(self) self.initPath() self.pos=list(self.laneShape[self.laneInd]) continue adv=lgt/l self.pos[0]+=(endPos[0]-self.pos[0])*adv self.pos[1]+=(endPos[1]-self.pos[1])*adv lgt=0 if self.controller.vroomEnable and randint(0,100) == 0: self.vroom = 60 if self.infoWidg is None: return self.signals.updateDisp.emit(("Position", self.pos)) self.signals.updateDisp.emit(("Vitesse", self.v)) self.signals.updateDisp.emit(("Leader", f"{self.leader.id if self.leader is not None else 'None'} @ {self.leaderDist:.2f}m : {self.leaderAtInter.id if self.leaderAtInter is not None else 'None'} @ {self.leaderAtInterDist:.2f})")) def __copy__(self): copy = Car(self.id, self.rawRoute, self.startTime, self.dynSpeed, self.IA, self.map, self.controller, self.infoWidg) copy.route = self.route copy.cligno = self.cligno copy.pos = self.pos.copy() copy.laneShape = self.laneShape.copy() copy.laneId = self.laneId copy.vmax = self.vmax return copy