IRESTE/Car.py
2022-05-14 17:01:21 +02:00

557 lines
22 KiB
Python

import sumolib
from math import dist, ceil, sqrt, log, cos, sin, atan2, pi
from random import randint, uniform
from PySide6.QtGui import QPainter, QColor
from PySide6.QtCore import QPointF, Signal, QObject, Qt
from itertools import islice
import time
class updateSignals(QObject):
updateDisp = Signal(tuple)
addGraphPt = Signal(tuple)
class Car():
CLIGNO_NONE = 0
CLIGNO_LEFT = 1
CLIGNO_RIGHT = 2
def getShape(self, edgeInd):
startEdge = self.route[edgeInd]
laneId = 0
lane = startEdge.getLane(laneId)
vmax = lane.getSpeed()
laneShape = lane.getShape()
return (laneShape, vmax, laneId)
def initPath(self):
newLane = self.getShape(self.index)
self.laneShape = newLane[0]
self.vmax = newLane[1]
self.laneId = newLane[2]
if self.infoWidg is None:
return
self.signals.updateDisp.emit(("Index",f"{self.index}/{len(self.route)}"))
self.signals.updateDisp.emit(("Edge",self.route[self.index]))
#print(f"{self.id} : {startEdge.getID()} -> {nextEdge.getID()} via {laneId}")
def __init__(self,carID,route,startTime,dynSpeed,IA,parentMap,parentController,infoWidg):
self.id=carID
self.map=parentMap
self.controller=parentController
self.infoWidg=infoWidg
self.index=0
self.laneInd=0
self.route=[]
self.laneShape=None
self.laneId=0
self.leader=None
self.leaderAtInter=None
self.leaderDist=0
self.leaderAtInterDist=0
self.startTime=float(startTime)
self.dynSpeed=(dynSpeed == '1') if isinstance(dynSpeed, str) else dynSpeed
self.IA=(IA == '1') if isinstance(IA, str) else IA
self.cligno=[]
self.isLeader=0
self.leaderStopped=0
self.pos=[0,0]
self.dir=0
self.v=0
self.a=10
self.b=20
self.minSpace=10
self.interMinSpace=20
self.distToInter=0
self.timeStopped=0
self.speedPercentage=0
self.ticksLived=0
self.timeAtInter = 0
self.forceThrough = False
self.vmax=0
self.alpha = 0.1
self.beta = 0.1
self.nu = 0
self.gamma = 10
self.delta = 0
#self.T = uniform(0.9,1.6) if not self.IA else 0.01
self.T = uniform(0.9,1.6) if not self.IA else 0.01
self.size = 3
self.vroom = 0
self.rawRoute = route
if infoWidg is None:
return
self.signals=updateSignals()
self.signals.updateDisp.connect(self.infoWidg.setVal)
self.signals.addGraphPt.connect(self.infoWidg.addSpeedPoint)
self.signals.updateDisp.emit(("Position",self.pos))
self.signals.updateDisp.emit(("Vitesse",self.v))
self.signals.updateDisp.emit(("Index",f"{self.index}/{len(route)}"))
def prepareRoute(self):
route = list(map(self.map.getEdge,self.rawRoute))
if None in route:
print("error planning route, mismatched net/demand?")
return
for r,rn in zip(route,route[1:]):
self.route.append(r)
conn=r.getConnections(rn)
if(len(conn)==0):
continue
if conn[0].getDirection() == "l":
self.cligno.append(self.CLIGNO_LEFT)
elif conn[0].getDirection() == "r":
self.cligno.append(self.CLIGNO_RIGHT)
else:
self.cligno.append(self.CLIGNO_NONE)
edge=self.map.getLane(conn[0].getViaLaneID()).getEdge()
self.route.append(edge)
self.cligno.append(self.CLIGNO_NONE)
secEdge=edge.getConnections(rn)[0].getViaLaneID() # Parfois je sais pas pourquoi il coupe les edges internes, mais il marque quand même la connection, ducoup pour contourner
while secEdge!="":
edge=self.map.getLane(secEdge).getEdge()
self.route.append(edge)
self.cligno.append(self.CLIGNO_NONE)
secEdge=edge.getConnections(rn)[0].getViaLaneID()
self.initPath()
self.pos=list(self.laneShape[0])
def getLeader(self, maxDist):
shapeInd = self.laneInd
edgeInd = self.index
prevInd = edgeInd
laneShape = self.laneShape
laneId = self.laneId
l = 0
carsHere = self.controller.getCarsOnLane(self.route[edgeInd].getID(), laneId)
carsHere = list(filter(lambda c: c.id != self.id, carsHere))
while(l<maxDist):
endPos = laneShape[shapeInd+1]
carsReallyHere = filter(lambda c: c.laneInd == shapeInd, carsHere)
if l == 0:
carsReallyHere = filter(lambda c: dist(c.pos,endPos) < dist(self.pos,endPos),carsReallyHere)
closest = None
carsReallyHere = list(carsReallyHere)
try:
closest = min(carsReallyHere, key=lambda c: dist(c.pos,laneShape[shapeInd]))
except ValueError as e:
if l == 0:
l+=dist(self.pos,endPos)
else:
l+=dist(laneShape[shapeInd],endPos)
shapeInd+=1
if(shapeInd>=len(laneShape)-1):
shapeInd = 0
edgeInd+=1
if(not self.route[edgeInd].isSpecial()):
prevInd = edgeInd
carsHere = self.controller.getCarsOnLane(self.route[edgeInd].getID(), laneId)
carsHere = list(filter(lambda c: c.id != self.id, carsHere)) # me demande pas pourquoi mais si on le convertit pas en liste ici le filter original est modifié
if(edgeInd>=len(self.route)-1):
return
newLane = self.getShape(edgeInd)
laneShape = newLane[0]
laneId = newLane[2]
else:
if l == 0:
l+=dist(self.pos, closest.pos)
else:
l+=dist(laneShape[shapeInd], closest.pos)
if l <= maxDist:
self.leaderDist=l
return closest
else:
return
return
def getCurrentEdge(self):
return self.route[self.index]
def getCarDist(self, car, edge, laneInd, startFromEnd = True):
lanes = edge.getLane(laneInd).getShape().copy()
if startFromEnd:
lanes.reverse()
cDist = 0
for i,l in enumerate(lanes[:-1]):
if car.laneInd != (i if not startFromEnd else len(lanes)-i-2):
cDist += dist(l, lanes[i+1])
else:
cDist += dist(l, car.pos)
return cDist
return cDist
def getDistToEndOfEdge(self):
lgt = 0
lgt += dist(self.pos, self.laneShape[self.laneInd+1])
for p,n in zip(self.laneShape[self.laneInd+1:], self.laneShape[self.laneInd+2:]):
lgt += dist(p,n)
return lgt
def getLeaderAtIntersections(self, maxDist):
l = self.getDistToEndOfEdge()
edgeInd = self.index + 1
while l < maxDist:
carComing = self.getLeaderAtIntersection(edgeInd - 1,edgeInd)
l += self.route[edgeInd].getLength()
if carComing is not None:
self.distToInter = l
self.leaderAtInterDist = carComing[0]
return carComing[1]
edgeInd += 1
if edgeInd >= len(self.route):
return
return
def getLeaderAtIntersection(self, prevInd, edgeInd):
# On récupère les edges juste avant et juste après l'intersection
while(self.route[edgeInd].isSpecial()):
return
edgeInd = edgeInd + 1
if edgeInd >= len(self.route):
print(self.id,"fu")
return None
while self.route[prevInd].isSpecial():
prevInd -= 1
if prevInd < 0:
print(self.id, "fu2")
return None
# On récupère les connections entre l'edge précedent et l'actuel, et on garde que la première, si y en as pas on panique
inter = self.route[edgeInd-1].getFromNode()
connections = self.route[prevInd].getConnections(self.route[edgeInd])
if(len(connections)==0):
print("pas de connections")
return None
connection = connections[0]
# On récupère la matrice de priorité pour notre route
linkInd = inter.getLinkIndex(connection)
if(linkInd == -1): # Ca devrait pas arriver, mais de toute évidence ça arrive
print(self.id, "fu3")
return;
resp = inter._prohibits[linkInd] # Si je me souvient bien les variables précédées d'un _ doivent pas être touchées?
# On se fait une map de correspondance entre indice de link et connection
connRaw = inter.getConnections()
conn = [0] * len(resp)
for c in connRaw:
ind = inter.getLinkIndex(c)
if(ind == -1):
continue
conn[ind] = c
# Pour chaque connection si on est pas prio on regarde si y as des voitures et comme d'hab on prend la plus proche
cars = []
for i,f in enumerate(reversed(resp)):
if(f == '0'):
continue
edge = conn[i].getFrom()
laneInd = conn[i].getFromLane().getIndex()
intLane = self.map.getLane(conn[i].getViaLaneID())
intLaneLgt = intLane.getLength()
carsInEdge = self.controller.getCarsOnLane(edge.getID(), laneInd)
carsInEdge = list(filter(lambda c: c.nextNonSpecialEdge() == conn[i].getTo(), carsInEdge))
if len(carsInEdge) != 0:
carsInEdge = zip([self.getCarDist(c, edge, laneInd)+intLaneLgt for c in carsInEdge], carsInEdge)
closest = min(carsInEdge, key=lambda c: c[0])
cars.append(closest)
intEdge = intLane.getEdge()
carsInEdge = list(self.controller.getCarsOnLane(intEdge.getID(), intLane.getIndex()))
if len(carsInEdge) != 0:
carsInEdge = zip([self.getCarDist(c, intEdge, intLane.getIndex()) for c in carsInEdge], carsInEdge)
closest = min(carsInEdge, key=lambda c: c[0])
cars.append(closest)
# On cherche aussi dans les edges du node precedent et les edge d'avant
prevLanesLgt = intLaneLgt + conn[i].getFromLane().getLength()
prevNode = edge.getFromNode()
for intLaneID in prevNode.getInternal():
intLane = self.map.getLane(intLaneID)
intEdge = intLane.getEdge()
carsInEdge = list(self.controller.getCarsOnLane(intEdge.getID(), intLane.getIndex()))
if len(carsInEdge) != 0:
carsInEdge = zip([self.getCarDist(c, intEdge, intLane.getIndex())+prevLanesLgt for c in carsInEdge], carsInEdge)
closest = min(carsInEdge, key=lambda c: c[0])
cl = closest[1]
if cl.nextNonSpecialEdge() == edge and cl.nextNonSpecialEdge(3) == conn[i].getTo(): #TODO: le 10 je l'ai sorti de mon
cars.append(closest)
for incEdge in prevNode.getIncoming():
if incEdge.isSpecial():
continue
incLane = incEdge.getLane(0)
intConn = incEdge.getConnections(edge)
intLaneLgt = 0
if len(intConn) != 0:
intLaneLgt = self.map.getLane(intConn[0].getViaLaneID()).getLength()
carsInEdge = list(self.controller.getCarsOnEdge(incEdge.getID()))
#carsInEdge = list(filter(lambda c: c.nextNonSpecialEdge() == edge and c.nextNonSpecialEdge(3) == conn[i].getTo() and (c.leader is None or c.leader.getCurrentEdge().getID() != c.getCurrentEdge().getID()), carsInEdge))
if len(carsInEdge) != 0:
carsInEdge = zip([self.getCarDist(c, incEdge, incLane.getIndex())+prevLanesLgt+intLaneLgt for c in carsInEdge], carsInEdge)
closest = min(carsInEdge, key=lambda c: c[0])
cl = closest[1]
if cl.nextNonSpecialEdge() == edge and cl.nextNonSpecialEdge(3) == conn[i].getTo(): #TODO: comme avant (d'ailleurs faudrait les rassembler)
cars.append(closest)
if(len(cars) == 0):
return None
cDist,closest = min(cars, key=lambda c: c[0])
return (cDist,closest)
def turnNext(self):
return self.cligno[self.index] != self.CLIGNO_NONE
def nextNonSpecialEdge(self, startOffset = 1):
res = None
try:
res = next(filter(lambda e: not e.isSpecial(), islice(self.route, self.index + startOffset, None)))
except StopIteration:
return
else:
return res
def draw(self,painter, colorOverride):
pt = QPointF(*self.pos)
if colorOverride:
pass
elif self.forceThrough:
painter.setPen(Qt.blue)
elif self.isLeader > 0:
painter.setPen(QColor(255,0,255))
elif self.leader is None:
painter.setPen(Qt.gray)
painter.drawEllipse(pt,self.size,self.size)
painter.drawLine(self.pos[0], self.pos[1], self.pos[0]+5*cos(self.dir), self.pos[1]+5*sin(self.dir))
if len(self.cligno) != 0 and self.cligno[self.index] != self.CLIGNO_NONE:
painter.save()
painter.setPen(Qt.yellow)
d = self.dir + pi/2
if self.cligno[self.index] == self.CLIGNO_RIGHT:
d += pi
painter.drawEllipse(pt + self.size * QPointF(cos(d),sin(d)), 3, 3)
painter.restore()
if self.vroom != 0:
painter.save()
d=(60-self.vroom)*0.2
painter.translate(pt + QPointF(0,d))
painter.scale(1,-1)
font = painter.font();
font.setPixelSize(ceil(self.vroom*0.2));
painter.setFont(font)
painter.drawText(QPointF(0,0),"vroom")
self.vroom -= 1
painter.restore()
#elSize = self.v * self.T
#painter.drawEllipse(pt,elSize, elSize)
def calcTti(self, dist, v0, vmax, a):
ttms = (vmax - v0)/a # "time to max speed", temps pris par la voiture pour atteindre la vmax à partir de sa vitesse actuelle
delta = v0**2 + 2*a*dist # delta du trinome qui donne le temps mis pour traverser la distance dist
tti = (-v0 + sqrt(delta))/a # "time to intersection", temps mis pour arriver à l'intersection(dist), si il n'y a pas de vmax
sai = v0 + tti*a # "speed at intersection", vitesse qu'aura la voiture quand elle arrivera
if tti > ttms: # si on atteint vmax avant l'intersection alors ça foire le calcul
dbvm = a/2 * ttms**2 + v0 * ttms # "distance before vmax", distance parcouru avant d'atteindre vmax
tti = ttms + (dist - dbvm) / vmax # la temps necessaire est donc : temps pour atteindre vmax + temps pour traverser le reste à la vitesse vmax
sai = vmax
return (tti, sai)
def conduiteKrauss(self, vmax, leader, leaderAtInter, dt):
"""if self.id == "f_00" and self.controller.t%10>5:
self.v = 0
return
if self.id == "v_0" and self.controller.t > 5:
self.v = 0
self.updateGraph(self.leaderDist, self.distToInter, 0)
return
"""
if False and ("f_0" in self.id or "f_1" in self.id or "f_2" in self.id):
self.v = 0
return
if leaderAtInter is None:
self.forceThrough = False
if leader is None and leaderAtInter is None:
vd = min(self.v + self.a * dt, vmax)
self.v = max(0, vd-self.nu)
return
vsecInter = vmax
# si on est à une intersection
if(leaderAtInter is not None):
if self.v == 0:
self.timeAtInter += dt
else:
self.timeAtInter = 0
vleader = leaderAtInter.v
# on calcule le temps qu'on va mettre à arriver à l'intersection
# et le temps que le leader va mettre
lvmax = leaderAtInter.vmax
if leaderAtInter.getCurrentEdge().isSpecial():
lvmax = leaderAtInter.nextNonSpecialEdge().getSpeed()
nextInternalIndex = self.index # Pour la voiture actuelle, dans l'ideal on calculerait la durée selon la vitesse sur chaque troncon
while not self.route[nextInternalIndex].isSpecial(): # Mais pour l'instant on prend juste la vitesse sur le troncon interne (le plus lent en general)
nextInternalIndex += 1
tti, sai = self.calcTti(self.distToInter, self.v, self.route[nextInternalIndex].getLane(0).getSpeed(), self.a) # TODO : laneID
ltti, lsai = self.calcTti(self.leaderAtInterDist, vleader, lvmax, leaderAtInter.a)
lta = (lvmax-vleader) / leaderAtInter.a # temps ou le leader accelere (i.e on ne gagne pas de vitesse relative) (on considere que leader.a==self.a)
marg = lta + (lsai-sai) / self.a # marge à prendre pour accelerer après l'intersection sans que le leader nous rattrape
tts = self.v/self.b # time to stop, temps pour s'arreter si on freine mnt
dts = self.v*tts - (self.b*tts**2)/2 # distance to stop, distance parcouru en tts si on freine
#print(self.distToInter, self.minSpace, dts)
# Si on est bloqué dans une dépendance circulaire
if self.timeAtInter >= 1 and self.leader is None and self.circularLeaderDep(self.leaderAtInter, 20):
self.forceThrough = True
# si on est suffisement loin de l'intersection (i.e on s'en fout du leader)
# ou si on as le temps d'arriver à l'intersection avant le leader (plus un marge pour garder un distance de sécu)
# alors on accelere pour s'inserer
#print(tti, leader.T, marg, ltti)
if self.distToInter > self.interMinSpace + dts or (tti + leaderAtInter.T + marg) < ltti:
vsecInter = min(vmax, self.v + self.a*dt)
#print(self.id, "ca passe")
else:# sinon on freine
vsecInter = max(0, self.v - self.b*dt)
if self.forceThrough:
self.leaderAtInter = None # On supprime le leader (pour que seulement la premiere voiture detecte la dependance circulaire)
vsecInter = min(vmax, self.v + self.a*dt)
vsec = vmax
if leader is not None:
vleader = leader.v
bleader = leader.b
vb = (vleader + self.v) / 2
bb = (bleader + self.b) / 2
vsec = vleader + (self.leaderDist - vleader * self.T - self.minSpace)/((vb/bb) + self.T)
vd = min(self.v + self.a * dt, vsec, vmax, vsecInter)
self.v = max(0, vd-self.nu)
self.updateGraph(self.v, vmax, vsec, vsecInter)
# fonction pour verifier si on as pas une dependence circulaire de leader
def circularLeaderDep(self, car, timeout):
if timeout <= 0:
return False
if car is None:
return False
if car.id == self.id:
return True
timeout -= 1
res = self.circularLeaderDep(car.leader, timeout)
res |= self.circularLeaderDep(car.leaderAtInter, timeout)
return res
def updateGraph(self, v, vmax, vsec, interVsec):
if self.infoWidg is None:
return
self.signals.addGraphPt.emit((2,self.controller.t,v))
self.signals.addGraphPt.emit((0,self.controller.t,vmax))
self.signals.addGraphPt.emit((1,self.controller.t,vsec))
self.signals.addGraphPt.emit((3,self.controller.t,interVsec))
def update(self,dt):
if self.controller.t < self.startTime:
return
"""
if self.v < 1 and self.getCurrentEdge().isSpecial():
print(f"{self.id} stalled where he souldn't have")
"""
self.leader=self.getLeader(100)
self.leaderAtInter=self.getLeaderAtIntersections(100)
vmax = self.vmax
if self.dynSpeed:
vmax = max(min(self.vmax * self.controller.dynSpeedRat, self.vmax), 8)
self.conduiteKrauss(vmax,self.leader,self.leaderAtInter,dt)
if self.v == 0:
self.timeStopped += dt
self.speedPercentage += self.v/self.vmax
self.ticksLived += 1
lgt=self.v*dt
self.dir = atan2(self.laneShape[self.laneInd+1][1]-self.pos[1],self.laneShape[self.laneInd+1][0]-self.pos[0])
while(lgt>0):
endPos=self.laneShape[self.laneInd+1]
l=dist(self.pos,endPos)
if lgt>=l:
lgt-=l
pos=list(self.laneShape[-1])
self.laneInd+=1
if(self.laneInd>=len(self.laneShape)-1):
self.laneInd=0
self.index+=1
if(self.index>=len(self.route)-1):
self.index=0
self.controller.destroyCar(self)
self.initPath()
self.pos=list(self.laneShape[self.laneInd])
continue
adv=lgt/l
self.pos[0]+=(endPos[0]-self.pos[0])*adv
self.pos[1]+=(endPos[1]-self.pos[1])*adv
lgt=0
if self.controller.vroomEnable and randint(0,100) == 0:
self.vroom = 60
if self.infoWidg is None:
return
self.signals.updateDisp.emit(("Position", self.pos))
self.signals.updateDisp.emit(("Vitesse", self.v))
self.signals.updateDisp.emit(("Leader", f"{self.leader.id if self.leader is not None else 'None'} @ {self.leaderDist:.2f}m : {self.leaderAtInter.id if self.leaderAtInter is not None else 'None'} @ {self.leaderAtInterDist:.2f})"))
def __copy__(self):
copy = Car(self.id, self.rawRoute, self.startTime, self.dynSpeed, self.IA, self.map, self.controller, self.infoWidg)
copy.route = self.route
copy.cligno = self.cligno
copy.pos = self.pos.copy()
copy.laneShape = self.laneShape.copy()
copy.laneId = self.laneId
copy.vmax = self.vmax
return copy