IRESTE/Car.py
2022-05-23 22:24:59 +02:00

581 lines
23 KiB
Python

import sumolib
from math import dist, ceil, sqrt, log, cos, sin, atan2, pi
from random import randint, uniform
from itertools import islice
import time
from globalImport import globalImport
def pysideImports():
globalImport(globals(), "PySide6.QtGui", ["QPainter", "QColor"])
globalImport(globals(), "PySide6.QtCore", ["QPointF", "Signal", "QObject", "Qt"])
class updateSignals(QObject):
updateDisp = Signal(tuple)
addGraphPt = Signal(tuple)
class Car():
CLIGNO_NONE = 0
CLIGNO_LEFT = 1
CLIGNO_RIGHT = 2
def getShape(self, edgeInd):
startEdge = self.route[edgeInd]
laneId = 0
lane = startEdge.getLane(laneId)
vmax = lane.getSpeed()
laneShape = lane.getShape()
return (laneShape, vmax, laneId)
def initPath(self):
newLane = self.getShape(self.index)
self.laneShape = newLane[0]
self.vmax = newLane[1]
self.laneId = newLane[2]
if self.infoWidg is None:
return
self.signals.updateDisp.emit(("Index",f"{self.index}/{len(self.route)}"))
self.signals.updateDisp.emit(("Edge",self.route[self.index]))
#print(f"{self.id} : {startEdge.getID()} -> {nextEdge.getID()} via {laneId}")
def __init__(self,carID,route,startTime,dynSpeed,IA,parentMap,parentController,infoWidg):
self.id=carID
self.map=parentMap
self.controller=parentController
self.infoWidg=infoWidg
self.index=0
self.laneInd=0
self.route=[]
self.laneShape=None
self.laneId=0
self.leader=None
self.leaderAtInter=None
self.leaderDist=0
self.leaderAtInterDist=0
self.startTime=float(startTime)
self.dynSpeed=(dynSpeed == '1') if isinstance(dynSpeed, str) else dynSpeed
self.IA=(IA == '1') if isinstance(IA, str) else IA
self.cligno=[]
self.isLeader=0
self.leaderStopped=0
self.pos=[0,0]
self.dir=0
self.v=0
self.a=10
self.b=20
self.minSpace=10
self.interMinSpace=10
self.distToInter=0
self.timeStopped=0
self.speedPercentage=0
self.ticksLived=0
self.timeAtInter = 0
self.forceThrough = False
self.vmax=0
self.alpha = 0.1
self.beta = 0.1
self.nu = 0
self.gamma = 10
self.delta = 0
#self.T = uniform(0.9,1.6) if not self.IA else 0.01
self.T = uniform(0.9,1.6) if not self.IA else 0.01
self.size = 3
self.vroom = 0
self.rawRoute = route
self.imported = False
if infoWidg is None:
return
pysideImports()
self.imported = True
self.signals=updateSignals()
self.signals.updateDisp.connect(self.infoWidg.setVal)
self.signals.addGraphPt.connect(self.infoWidg.addSpeedPoint)
self.signals.updateDisp.emit(("Position",self.pos))
self.signals.updateDisp.emit(("Vitesse",self.v))
self.signals.updateDisp.emit(("Index",f"{self.index}/{len(route)}"))
def prepareRoute(self):
route = list(map(self.map.getEdge,self.rawRoute))
if None in route:
print("error planning route, mismatched net/demand?")
return
for r,rn in zip(route,route[1:]):
self.route.append(r)
conn=r.getConnections(rn)
if(len(conn)==0):
continue
if conn[0].getDirection() == "l":
self.cligno.append(self.CLIGNO_LEFT)
elif conn[0].getDirection() == "r":
self.cligno.append(self.CLIGNO_RIGHT)
else:
self.cligno.append(self.CLIGNO_NONE)
edge=self.map.getLane(conn[0].getViaLaneID()).getEdge()
self.route.append(edge)
self.cligno.append(self.CLIGNO_NONE)
secEdge=edge.getConnections(rn)[0].getViaLaneID() # Parfois je sais pas pourquoi il coupe les edges internes, mais il marque quand même la connection, ducoup pour contourner
while secEdge!="":
edge=self.map.getLane(secEdge).getEdge()
self.route.append(edge)
self.cligno.append(self.CLIGNO_NONE)
secEdge=edge.getConnections(rn)[0].getViaLaneID()
self.initPath()
self.pos=list(self.laneShape[0])
def getLeader(self, maxDist):
shapeInd = self.laneInd
edgeInd = self.index
prevInd = edgeInd
laneShape = self.laneShape
laneId = self.laneId
l = 0
carsHere = self.controller.getCarsOnLane(self.route[edgeInd].getID(), laneId)
carsHere = list(filter(lambda c: c.id != self.id, carsHere))
while(l<maxDist):
endPos = laneShape[shapeInd+1]
carsReallyHere = filter(lambda c: c.laneInd == shapeInd, carsHere)
if l == 0:
carsReallyHere = filter(lambda c: dist(c.pos,endPos) < dist(self.pos,endPos),carsReallyHere)
closest = None
carsReallyHere = list(carsReallyHere)
try:
closest = min(carsReallyHere, key=lambda c: dist(c.pos,laneShape[shapeInd]))
except ValueError as e:
if l == 0:
l+=dist(self.pos,endPos)
else:
l+=dist(laneShape[shapeInd],endPos)
shapeInd+=1
if(shapeInd>=len(laneShape)-1):
shapeInd = 0
edgeInd+=1
if(not self.route[edgeInd].isSpecial()):
prevInd = edgeInd
carsHere = self.controller.getCarsOnLane(self.route[edgeInd].getID(), laneId)
carsHere = list(filter(lambda c: c.id != self.id, carsHere)) # me demande pas pourquoi mais si on le convertit pas en liste ici le filter original est modifié
if(edgeInd>=len(self.route)-1):
return
newLane = self.getShape(edgeInd)
laneShape = newLane[0]
laneId = newLane[2]
else:
if l == 0:
l+=dist(self.pos, closest.pos)
else:
l+=dist(laneShape[shapeInd], closest.pos)
if l <= maxDist:
self.leaderDist=l
return closest
else:
return
return
def getCurrentEdge(self):
return self.route[self.index]
def getCarDist(self, car, edge, laneInd, startFromEnd = True):
lanes = edge.getLane(laneInd).getShape().copy()
if startFromEnd:
lanes.reverse()
cDist = 0
for i,l in enumerate(lanes[:-1]):
if car.laneInd != (i if not startFromEnd else len(lanes)-i-2):
cDist += dist(l, lanes[i+1])
else:
cDist += dist(l, car.pos)
return cDist
return cDist
def getDistToEndOfEdge(self):
lgt = 0
lgt += dist(self.pos, self.laneShape[self.laneInd+1])
for p,n in zip(self.laneShape[self.laneInd+1:], self.laneShape[self.laneInd+2:]):
lgt += dist(p,n)
return lgt
def getLeaderAtIntersections(self, maxDist):
l = self.getDistToEndOfEdge()
edgeInd = self.index + 1
while l < maxDist:
carComing = self.getLeaderAtIntersection(edgeInd - 1,edgeInd)
if carComing is not None:
self.distToInter = l
self.leaderAtInterDist = carComing[0]
return carComing[1]
l += self.route[edgeInd].getLength()
edgeInd += 1
if edgeInd >= len(self.route):
return
return
def getLeaderAtIntersection(self, prevInd, edgeInd):
# On récupère les edges juste avant et juste après l'intersection
while(self.route[edgeInd].isSpecial()):
return
edgeInd = edgeInd + 1
if edgeInd >= len(self.route):
print(self.id,"fu")
return None
while self.route[prevInd].isSpecial():
prevInd -= 1
if prevInd < 0:
print(self.id, "fu2")
return None
# On récupère les connections entre l'edge précedent et l'actuel, et on garde que la première, si y en as pas on panique
inter = self.route[edgeInd-1].getFromNode()
connections = self.route[prevInd].getConnections(self.route[edgeInd])
if(len(connections)==0):
print("pas de connections")
return None
connection = connections[0]
# On récupère la matrice de priorité pour notre route
linkInd = inter.getLinkIndex(connection)
if(linkInd == -1): # Ca devrait pas arriver, mais de toute évidence ça arrive
print(self.id, "fu3")
return;
resp = inter._prohibits[linkInd] # Si je me souvient bien les variables précédées d'un _ doivent pas être touchées?
# On se fait une map de correspondance entre indice de link et connection
connRaw = inter.getConnections()
conn = [0] * len(resp)
for c in connRaw:
ind = inter.getLinkIndex(c)
if(ind == -1):
continue
conn[ind] = c
# Pour chaque connection si on est pas prio on regarde si y as des voitures et comme d'hab on prend la plus proche
cars = []
for i,f in enumerate(reversed(resp)):
if(f == '0'):
continue
edge = conn[i].getFrom()
laneInd = conn[i].getFromLane().getIndex()
intLane = self.map.getLane(conn[i].getViaLaneID())
intLaneLgt = intLane.getLength()
carsInEdge = self.controller.getCarsOnLane(edge.getID(), laneInd)
carsInEdge = list(filter(lambda c: c.nextNonSpecialEdge() == conn[i].getTo(), carsInEdge))
if len(carsInEdge) != 0:
carsInEdge = zip([self.getCarDist(c, edge, laneInd)+intLaneLgt for c in carsInEdge], carsInEdge)
closest = min(carsInEdge, key=lambda c: c[0])
cars.append(closest)
intEdge = intLane.getEdge()
carsInEdge = list(self.controller.getCarsOnLane(intEdge.getID(), intLane.getIndex()))
if len(carsInEdge) != 0:
carsInEdge = zip([self.getCarDist(c, intEdge, intLane.getIndex()) for c in carsInEdge], carsInEdge)
closest = min(carsInEdge, key=lambda c: c[0])
cars.append(closest)
# On cherche aussi dans les edges du node precedent et les edge d'avant
prevLanesLgt = intLaneLgt + conn[i].getFromLane().getLength()
prevNode = edge.getFromNode()
for intLaneID in prevNode.getInternal():
intLane = self.map.getLane(intLaneID)
intEdge = intLane.getEdge()
carsInEdge = list(self.controller.getCarsOnLane(intEdge.getID(), intLane.getIndex()))
if len(carsInEdge) != 0:
carsInEdge = zip([self.getCarDist(c, intEdge, intLane.getIndex())+prevLanesLgt for c in carsInEdge], carsInEdge)
closest = min(carsInEdge, key=lambda c: c[0])
cl = closest[1]
if cl.nextNonSpecialEdge() == edge and cl.nextNonSpecialEdge(3) == conn[i].getTo(): #TODO: le 10 je l'ai sorti de mon
cars.append(closest)
for incEdge in prevNode.getIncoming():
if incEdge.isSpecial():
continue
incLane = incEdge.getLane(0)
intConn = incEdge.getConnections(edge)
intLaneLgt = 0
if len(intConn) != 0:
intLaneLgt = self.map.getLane(intConn[0].getViaLaneID()).getLength()
carsInEdge = list(self.controller.getCarsOnEdge(incEdge.getID()))
#carsInEdge = list(filter(lambda c: c.nextNonSpecialEdge() == edge and c.nextNonSpecialEdge(3) == conn[i].getTo() and (c.leader is None or c.leader.getCurrentEdge().getID() != c.getCurrentEdge().getID()), carsInEdge))
if len(carsInEdge) != 0:
carsInEdge = zip([self.getCarDist(c, incEdge, incLane.getIndex())+prevLanesLgt+intLaneLgt for c in carsInEdge], carsInEdge)
closest = min(carsInEdge, key=lambda c: c[0])
cl = closest[1]
if cl.nextNonSpecialEdge() == edge and cl.nextNonSpecialEdge(3) == conn[i].getTo(): #TODO: comme avant (d'ailleurs faudrait les rassembler)
cars.append(closest)
if(len(cars) == 0):
return None
cDist,closest = min(cars, key=lambda c: c[0])
return (cDist,closest)
def turnNext(self):
return self.cligno[self.index] != self.CLIGNO_NONE
def nextNonSpecialEdge(self, startOffset = 1):
res = None
try:
res = next(filter(lambda e: not e.isSpecial(), islice(self.route, self.index + startOffset, None)))
except StopIteration:
return
else:
return res
def draw(self,painter, colorOverride):
if not self.imported:
pysideImports()
self.imported = True
pt = QPointF(*self.pos)
if colorOverride:
pass
elif self.forceThrough:
painter.setPen(Qt.blue)
elif self.isLeader > 0:
painter.setPen(QColor(255,0,255))
elif self.leader is None:
painter.setPen(Qt.gray)
painter.drawEllipse(pt,self.size,self.size)
painter.drawLine(self.pos[0], self.pos[1], self.pos[0]+5*cos(self.dir), self.pos[1]+5*sin(self.dir))
if len(self.cligno) != 0 and self.cligno[self.index] != self.CLIGNO_NONE:
painter.save()
painter.setPen(Qt.yellow)
d = self.dir + pi/2
if self.cligno[self.index] == self.CLIGNO_RIGHT:
d += pi
painter.drawEllipse(pt + self.size * QPointF(cos(d),sin(d)), 3, 3)
painter.restore()
if self.vroom != 0:
painter.save()
d=(60-self.vroom)*0.2
painter.translate(pt + QPointF(0,d))
painter.scale(1,-1)
font = painter.font();
font.setPixelSize(ceil(self.vroom*0.2));
painter.setFont(font)
painter.drawText(QPointF(0,0),"vroom")
self.vroom -= 1
painter.restore()
#elSize = self.v * self.T
#painter.drawEllipse(pt,elSize, elSize)
def calcTti(self, dist, v0, vmax, a):
ttms = (vmax - v0)/a # "time to max speed", temps pris par la voiture pour atteindre la vmax à partir de sa vitesse actuelle
delta = v0**2 + 2*a*dist # delta du trinome qui donne le temps mis pour traverser la distance dist
tti = (-v0 + sqrt(delta))/a # "time to intersection", temps mis pour arriver à l'intersection(dist), si il n'y a pas de vmax
sai = v0 + tti*a # "speed at intersection", vitesse qu'aura la voiture quand elle arrivera
if tti > ttms: # si on atteint vmax avant l'intersection alors ça foire le calcul
dbvm = a/2 * ttms**2 + v0 * ttms # "distance before vmax", distance parcouru avant d'atteindre vmax
tti = ttms + (dist - dbvm) / vmax # la temps necessaire est donc : temps pour atteindre vmax + temps pour traverser le reste à la vitesse vmax
sai = vmax
return (tti, sai)
def conduiteKrauss(self, vmax, leader, leaderAtInter, dt):
"""if self.id == "f_00" and self.controller.t%10>5:
self.v = 0
return
if self.id == "v_0" and self.controller.t > 5:
self.v = 0
self.updateGraph(self.leaderDist, self.distToInter, 0)
return
"""
if False and ("f_0" in self.id or "f_1" in self.id or "f_2" in self.id):
self.v = 0
return
if leaderAtInter is None:
self.forceThrough = False
if leader is None and leaderAtInter is None:
vd = min(self.v + self.a * dt, vmax)
self.v = max(0, vd-self.nu)
return
vsecInter = vmax
# si on est à une intersection
if(leaderAtInter is not None):
if self.v == 0:
self.timeAtInter += dt
else:
self.timeAtInter = 0
vleader = leaderAtInter.v
# on calcule le temps qu'on va mettre à arriver à l'intersection
# et le temps que le leader va mettre
lvmax = leaderAtInter.vmax
if leaderAtInter.getCurrentEdge().isSpecial():
lvmax = leaderAtInter.nextNonSpecialEdge().getSpeed()
nextInternalIndex = self.index # Pour la voiture actuelle, dans l'ideal on calculerait la durée selon la vitesse sur chaque troncon
while not self.route[nextInternalIndex].isSpecial(): # Mais pour l'instant on prend juste la vitesse sur le troncon interne (le plus lent en general)
nextInternalIndex += 1
tti, sai = self.calcTti(self.distToInter, self.v, self.route[nextInternalIndex].getSpeed(), self.a)
ltti, lsai = self.calcTti(self.leaderAtInterDist, vleader, lvmax, leaderAtInter.a)
lta = (lvmax-vleader) / leaderAtInter.a # temps ou le leader accelere (i.e on ne gagne pas de vitesse relative) (on considere que leader.a==self.a)
marg = lta + (lsai-sai) / self.a # marge à prendre pour accelerer après l'intersection sans que le leader nous rattrape
tts = self.v/self.b # time to stop, temps pour s'arreter si on freine mnt
dts = self.v*tts - (self.b*tts**2)/2 # distance to stop, distance parcouru en tts si on freine
#print(self.distToInter, self.minSpace, dts)
# Si on est bloqué dans une dépendance circulaire
maxInterTime = 0
if self.leader is None and self.timeAtInter >= 0:
cd, maxInterTime = self.circularLeaderDep(self.leaderAtInter, 0, 20)
if cd and self.timeAtInter >= maxInterTime:
self.forceThrough = True
# Ou si notre leader est bloqué devant un autre leader
if self.leader is None and self.leaderAtInter.v == 0 and self.leaderAtInter.leaderAtInter is not None and self.leaderAtInter.leaderAtInterDist > self.interMinSpace and maxInterTime != -1:
self.forceThrough = True
# si on est suffisement loin de l'intersection (i.e on s'en fout du leader)
# ou si on as le temps d'arriver à l'intersection avant le leader (plus un marge pour garder un distance de sécu)
# alors on accelere pour s'inserer
#print(tti, leader.T, marg, ltti)
if self.distToInter > self.route[nextInternalIndex].getLength() + self.minSpace + dts or (tti + leaderAtInter.T + 0.1 + marg) < ltti:
vsecInter = min(vmax, self.v + self.a*dt)
#print(self.id, "ca passe")
else:# sinon on freine
vsecInter = max(0, self.v - self.b*dt)
if self.forceThrough:
vsecInter = min(vmax, self.v + self.a*dt)
vsec = vmax
if leader is not None:
vleader = leader.v
bleader = leader.b
vb = (vleader + self.v) / 2
bb = (bleader + self.b) / 2
vsec = vleader + (self.leaderDist - vleader * self.T - self.minSpace)/((vb/bb) + self.T)
vd = min(self.v + self.a * dt, vsec, vmax, vsecInter)
self.v = max(0, vd-self.nu)
self.updateGraph(self.v, vmax, vsec, vsecInter)
# fonction pour verifier si on as pas une dependence circulaire de leader
def circularLeaderDep(self, car, maxTimeStopped, timeout):
if timeout <= 0:
return False, 0
if car is None:
return False, 0
if car.forceThrough:
return False, -1
maxTimeStopped = max(maxTimeStopped, car.timeAtInter)
if car.id == self.id:
return True, maxTimeStopped
timeout -= 1
res,mts = self.circularLeaderDep(car.leader, maxTimeStopped, timeout)
res2,mts2 = self.circularLeaderDep(car.leaderAtInter, maxTimeStopped, timeout)
maxTimeStopped = max(mts, mts2)
if min(mts, mts2) == -1:
return False, -1
return (res or res2), maxTimeStopped
def updateGraph(self, v, vmax, vsec, interVsec):
if self.infoWidg is None:
return
self.signals.addGraphPt.emit((2,self.controller.t,v))
self.signals.addGraphPt.emit((0,self.controller.t,vmax))
self.signals.addGraphPt.emit((1,self.controller.t,vsec))
self.signals.addGraphPt.emit((3,self.controller.t,interVsec))
def update(self,dt):
if self.controller.t < self.startTime:
return
"""
if self.v < 1 and self.getCurrentEdge().isSpecial():
print(f"{self.id} stalled where he souldn't have")
"""
self.leader=self.getLeader(100)
self.leaderAtInter=self.getLeaderAtIntersections(100)
vmax = self.vmax
if self.dynSpeed:
vmax = max(min(self.vmax * self.controller.dynSpeedRat, self.vmax), 8)
self.conduiteKrauss(vmax,self.leader,self.leaderAtInter,dt)
if self.v == 0:
self.timeStopped += dt
self.speedPercentage += self.v/self.vmax
self.ticksLived += 1
lgt=self.v*dt
self.dir = atan2(self.laneShape[self.laneInd+1][1]-self.pos[1],self.laneShape[self.laneInd+1][0]-self.pos[0])
while(lgt>0):
endPos=self.laneShape[self.laneInd+1]
l=dist(self.pos,endPos)
if lgt>=l:
lgt-=l
pos=list(self.laneShape[-1])
self.laneInd+=1
if(self.laneInd>=len(self.laneShape)-1):
self.laneInd=0
self.index+=1
if(self.index>=len(self.route)-1):
self.index=0
self.controller.destroyCar(self)
self.initPath()
self.pos=list(self.laneShape[self.laneInd])
continue
adv=lgt/l
self.pos[0]+=(endPos[0]-self.pos[0])*adv
self.pos[1]+=(endPos[1]-self.pos[1])*adv
lgt=0
if self.controller.vroomEnable and randint(0,100) == 0:
self.vroom = 60
if self.infoWidg is None:
return
self.signals.updateDisp.emit(("Position", self.pos))
self.signals.updateDisp.emit(("Vitesse", self.v))
self.signals.updateDisp.emit(("Leader", f"{self.leader.id if self.leader is not None else 'None'} @ {self.leaderDist:.2f}m : {self.leaderAtInter.id if self.leaderAtInter is not None else 'None'} @ {self.leaderAtInterDist:.2f})"))
def __copy__(self):
copy = Car(self.id, self.rawRoute, self.startTime, self.dynSpeed, self.IA, self.map, self.controller, self.infoWidg)
copy.route = self.route
copy.cligno = self.cligno
copy.pos = self.pos.copy()
copy.laneShape = self.laneShape.copy()
copy.laneId = self.laneId
copy.vmax = self.vmax
return copy