# Zhihao Zhang
# roadway class in python
from src.curves import CurvePt
from src.Vec import VecSE2, VecE2
from src.Vec.geom import geom
import math
import re
import warnings
[docs]class LaneBoundary:
def __init__(self, style: str, color: str):
'''
:param style: LaneBoundary style
:param color: LaneBoundary color
'''
self.style = style
self.color = color
[docs]class SpeedLimit:
def __init__(self, lo: float, hi: float):
'''
:param lo: the lowest speed allowed
:param hi: the highest speed allowed
'''
self.lo = lo
self.hi = hi
NULL_BOUNDARY = LaneBoundary("unknown", "unknown")
DEFAULT_SPEED_LIMIT = SpeedLimit(-math.inf, math.inf)
DEFAULT_LANE_WIDTH = 3.0
METERS_PER_FOOT = 0.3048
[docs]class LaneTag:
def __init__(self, segment: int, lane: int):
'''
:param segment: the segment which this lane belongs to
:param lane: which lane it belongs to
'''
self.segment = segment
self.lane = lane
NULL_LANETAG = LaneTag(0, 0)
[docs]class RoadIndex:
'''
RoadIndex{I <: Integer, T <: Real}
A data structure to index points in a roadway. Calling `roadway[roadind]` will return the point
associated to the road index.
# Fields
- `ind::CurveIndex{I,T}` the index of the point in the curve
- `tag::LaneTag` the lane tag of the point
'''
def __init__(self, ind: CurvePt.CurveIndex, tag: LaneTag):
self.ind = ind
self.tag = tag
[docs] def write(self, fp):
fp.write("%d %.6f %d %d\n" % (self.ind.i, self.ind.t, self.tag.segment, self.tag.lane))
NULL_ROADINDEX = RoadIndex(CurvePt.CurveIndex(-1, None), LaneTag(-1, -1))
[docs]class LaneConnection:
'''
LaneConnection{I <: Integer, T <: Real}
Data structure to specify the connection of a lane. It connects `mylane` to the point `target`.
`target` would typically be the starting point of a new lane.
- `downstream::Bool`
- `mylane::CurveIndex{I,T}`
- `target::RoadIndex{I,T}`
'''
def __init__(self, downstream: bool, mylane: CurvePt.CurveIndex, target: RoadIndex):
self.downstream = downstream # if true, mylane → target, else target → mylane
self.mylane = mylane
self.target = target
[docs] def write(self, fp):
fp.write("%s (%d %.6f) " % ("D" if self.downstream else "U", self.mylane.i, self.mylane.t))
self.target.write(fp)
[docs]def parse_lane_connection(line: str):
cleanedline = re.sub(r"(\(|\))", "", line)
# print(cleanedline)
tokens = cleanedline.split()
assert tokens[0] == "D" or tokens[0] == "U"
downstream = (tokens[0] == "D")
mylane = CurvePt.CurveIndex(int(tokens[1]), float(tokens[2]))
target = RoadIndex(
CurvePt.CurveIndex(int(tokens[3]), float(tokens[4])),
LaneTag(int(tokens[5]), int(tokens[6]))
)
return LaneConnection(downstream, mylane, target)
[docs]class Lane:
'''
Lane class
'''
def __init__(self, tag: LaneTag, curve: list, width: float = DEFAULT_LANE_WIDTH, speed_limit: SpeedLimit = DEFAULT_SPEED_LIMIT,
boundary_left: LaneBoundary = NULL_BOUNDARY, boundary_right: LaneBoundary = NULL_BOUNDARY, exits: list = [], entrances: list = [],
next: RoadIndex = NULL_ROADINDEX, prev: RoadIndex = NULL_ROADINDEX):
'''
Lane
A driving lane on a roadway. It identified by a `LaneTag`. A lane is defined by a curve which
represents a center line and a width. In addition it has attributed like speed limit.
A lane can be connected to other lane in the roadway, the connection are specified in the exits
and entrances fields.
# Fields
- `tag::LaneTag`
- `curve::List{CurvePt}`
- `width::float` [m]
- `speed_limit::SpeedLimit`
- `boundary_left::LaneBoundary`
- `boundary_right::LaneBoundary`
- `exits::List{LaneConnection} # list of exits; put the primary exit (at end of lane) first`
- `entrances::List{LaneConnection} # list of entrances; put the primary entrance (at start of lane) first`
'''
self.tag = tag
self.curve = curve # Array of CurvePt
self.width = width
self.speed_limit = speed_limit
self.boundary_left = boundary_left
self.boundary_right = boundary_right
self.exits = exits # Array of LaneConnection
self.entrances = entrances # Array of LaneConnection
if next != NULL_ROADINDEX:
self.exits.insert(0, LaneConnection(True, CurvePt.curveindex_end(self.curve), next))
if prev != NULL_ROADINDEX:
self.entrances.insert(0, LaneConnection(False, CurvePt.CURVEINDEX_START, prev))
[docs] def get_by_ind_roadway(self, ind: CurvePt.CurveIndex, roadway):
'''
get CurvePt by Curve Index and roadway
:param ind: Curve Index
:param roadway: Roadway object
:return: the corresponding CurvePt object
'''
# print(ind.i, len(self.curve))
if ind.i == -1:
pt_lo = prev_lane_point(self, roadway)
pt_hi = self.curve[0]
s_gap = VecE2.norm(VecE2.VecE2(pt_hi.pos - pt_lo.pos))
pt_lo = CurvePt.CurvePt(pt_lo.pos, -s_gap, pt_lo.k, pt_lo.kd)
return CurvePt.lerp(pt_lo, pt_hi, ind.t)
elif ind.i < len(self.curve) - 1:
return CurvePt.get_curve_list_by_index(self.curve, ind)
else:
pt_hi = next_lane_point(self, roadway)
pt_lo = self.curve[-1]
s_gap = VecE2.norm(VecE2.VecE2(pt_hi.pos - pt_lo.pos))
pt_hi = CurvePt.CurvePt(pt_hi.pos, pt_lo.s + s_gap, pt_hi.k, pt_hi.kd)
return CurvePt.lerp(pt_lo, pt_hi, ind.t)
[docs]def has_next(lane: Lane):
'''
check if the given lane has next lane
:param lane: the given lane
:return: true if there is, false otherwise
'''
return (not len(lane.exits) == 0) and lane.exits[0].mylane == CurvePt.curveindex_end(lane.curve)
[docs]def has_prev(lane: Lane):
'''
check if the given lane has previous lane
:param lane: the given lane
:return: true if there is, false otherwise
'''
return (not len(lane.entrances) == 0) and lane.entrances[0].mylane == CurvePt.CURVEINDEX_START
[docs]def connect(source: Lane, dest: Lane):
cindS = CurvePt.curveindex_end(source.curve)
cindD = CurvePt.CURVEINDEX_START
source.exits.insert(0, LaneConnection(True, cindS, RoadIndex(cindD, dest.tag)))
dest.entrances.insert(0, LaneConnection(False, cindD, RoadIndex(cindS, source.tag)))
return source, dest
[docs]class RoadSegment:
def __init__(self, id: int, lanes: list = []):
'''
:param id: the identification number of the corresponding road segment
:param lanes: list of lane in this segment
'''
self.id = id # integer
self.lanes = lanes # Array of Lane
[docs]class Roadway:
def __init__(self, segments: list = []):
'''
This is the Roadway class
:param segments: A list of RoadSegment
'''
self.segments = segments # Array of RoadSegment
[docs] def get_by_tag(self, tag: LaneTag):
'''
get lane by a query lane tag
:param tag: query LaneTag
:return: the lane correspond to the query lane tag
'''
seg = self.get_by_id(tag.segment)
# print(seg.id, len(seg.lanes), tag.lane)
return seg.lanes[tag.lane]
[docs] def get_by_id(self, segid: int):
'''
get seg by a query seg id
:param segid: query seg id
:return: the seg correspond to the query seg id
'''
for seg in self.segments:
if seg.id == segid:
return seg
raise IndexError("Could not find segid {} in roadway".format(segid))
[docs] def get_by_roadindex(self, roadindex: RoadIndex):
'''
get lane by a query road index
:param roadindex: query road index
:return: the lane correspond to the query road index
'''
lane = self.get_by_tag(roadindex.tag)
return lane.get_by_ind_roadway(roadindex.ind, self)
[docs] def has_segment(self, segid: int):
'''
check if the roadway object has a seg given the seg id
:param segid: seg id
:return: true if there is, false otherwise
'''
for seg in self.segments:
if seg.id == segid:
return True
return False
[docs] def write(self, filepath: str):
'''
write the roadway information to the file given the filepath
:param filepath: the file to write, string
:return: no return
'''
# writes to a text file
with open(filepath, "w") as fp:
fp.write("ROADWAY\n")
fp.write("{}\n".format(len(self.segments)))
for seg in self.segments:
fp.write("{}\n".format(seg.id))
fp.write("\t{}\n".format(len(seg.lanes)))
for (i, lane) in enumerate(seg.lanes):
assert (lane.tag.lane == i + 1)
fp.write("\t%d\n" % (i + 1))
fp.write("\t\t%.3f\n" % lane.width)
fp.write("\t\t%.3f %.3f\n" % (lane.speed_limit.lo, lane.speed_limit.hi))
fp.write("\t\t{} {}\n".format(lane.boundary_left.style, lane.boundary_left.color))
fp.write("\t\t{} {}\n".format(lane.boundary_right.style, lane.boundary_right.color))
fp.write("\t\t{}\n".format(len(lane.exits) + len(lane.entrances)))
for conn in lane.exits:
fp.write("\t\t\t")
conn.write(fp)
fp.write("\n")
for conn in lane.entrances:
fp.write("\t\t\t")
conn.write(fp)
fp.write("\n")
fp.write("\t\t{}\n".format(len(lane.curve)))
for pt in lane.curve:
fp.write("\t\t\t(%.4f %.4f %.6f) %.4f %.8f %.8f\n" % (pt.pos.x, pt.pos.y, pt.pos.theta,
pt.s, pt.k, pt.kd))
[docs]def read_roadway(fp):
'''
parse roadway file and read it to a file given the file pointer
:param fp: the file pointer
:return: no return
'''
lines = fp.readlines()
fp.close()
line_index = 0
if "ROADWAY" in lines[line_index]: #文件第一行 ROADWAY title
line_index += 1 #继续读下一行
nsegs = int(lines[line_index].strip()) # 读Roadway.segments的长度
line_index += 1 #继续读下一行
roadway = Roadway([])
for i_seg in range(nsegs): # 循环读每个segments的内容
segid = int(lines[line_index].strip()) # parse segments.id
line_index += 1 #继续读下一行
nlanes = int(lines[line_index].strip()) # 读出Roadway.segments.lanes的长度
line_index += 1 #继续读下一行
seg = RoadSegment(segid, []) #
for i_lane in range(nlanes): # 循环读每个lane的内容
assert i_lane + 1 == int(lines[line_index].strip()) # 这里是用来确认lane符合顺序且读的方式正确
line_index += 1 #继续读下一行
tag = LaneTag(segid, i_lane) # make Roadway.segments.lanes.tag
#print(segid, nlanes, i_lane)
width = float(lines[line_index].strip()) # parse width
line_index += 1 #继续读下一行
tokens = (lines[line_index].strip()).split()
line_index += 1 #继续读下一行
speed_limit = SpeedLimit(float(tokens[0]), float(tokens[1])) # parse speed limit
tokens = (lines[line_index].strip()).split()
line_index += 1 #继续读下一行
boundary_left = LaneBoundary(tokens[0], tokens[1]) # parse boundary_left
tokens = (lines[line_index].strip()).split()
line_index += 1 #继续读下一行
boundary_right = LaneBoundary(tokens[0], tokens[1]) # parse boundary_right
exits = []
entrances = []
n_conns = int(lines[line_index].strip()) # 这里应该是Roadway.segments.lanes.exits以及entrances的长度
line_index += 1 #继续读下一行
for i_conn in range(n_conns): # 循环parse每个exit以及entrance
conn = parse_lane_connection(lines[line_index].strip()) # parse LaneConnection
line_index += 1 #继续读下一行
if conn.downstream:
exits.append(conn) # if downstream 就是exit的数据
else:
entrances.append(conn) # else entrance的数据
npts = int(lines[line_index].strip()) # 有多少个curve点————Roadway.segments.lanes.curve的长度
line_index += 1 #继续读下一行
curve = []
for i_pt in range(npts): # 循环parse CurvePt
line = lines[line_index].strip()
line_index += 1 #继续读下一行
cleanedline = re.sub(r"(\(|\))", "", line)
tokens = cleanedline.split()
x = float(tokens[0])
x = x/METERS_PER_FOOT
y = float(tokens[1])
y = y/METERS_PER_FOOT
theta = float(tokens[2])
s = float(tokens[3])
k = float(tokens[4])
kd = float(tokens[5])
curve.append(CurvePt.CurvePt(VecSE2.VecSE2(x, y, theta), s, k, kd)) # append CurvePt in Roadway.segments.lanes.curve
seg.lanes.append(Lane(tag, curve, width=width, speed_limit=speed_limit,
boundary_left=boundary_left,
boundary_right=boundary_right,
entrances=entrances, exits=exits)) # append lane in Roadway.segments.lanes
roadway.segments.append(seg) # append segs in Roadway.segments
#print(len(roadway.segments))
return roadway
[docs]class RoadProjection:
def __init__(self, curveproj: CurvePt.CurveProjection, tag: LaneTag):
'''
RoadProjection class
:param curveproj: CurveProjection object
:param tag: LaneTag
'''
self.curveproj = curveproj
self.tag = tag
[docs]def next_lane(lane: Lane, roadway: Roadway):
'''
get next lane
:param lane: Lane object
:param roadway: Roadway object
:return: the lane after the given lane
'''
return roadway.get_by_tag(lane.exits[0].target.tag)
[docs]def prev_lane(lane: Lane, roadway: Roadway):
'''
get previous lane
:param lane: Lane object
:param roadway: Roadway object
:return: the lane before the given lane
'''
return roadway.get_by_tag(lane.entrances[0].target.tag)
[docs]def next_lane_point(lane: Lane, roadway: Roadway):
'''
get next lane point
:param lane: Lane object
:param roadway: Roadway object
:return: the lane point after the given lane
'''
return roadway.get_by_roadindex(lane.exits[0].target)
[docs]def prev_lane_point(lane: Lane, roadway: Roadway):
'''
get previous lane point
:param lane: Lane object
:param roadway: Roadway object
:return: the lane point before the given lane
'''
return roadway.get_by_roadindex(lane.entrances[0].target)
[docs]def sign(a):
'''
check the sign of a given real number
:param a: the given real number
:return: 1 if >0, -1 if <0, 0 if ==0
'''
if a > 0:
return 1
elif a < 0:
return -1
else:
return 0
[docs]def get_closest_perpendicular_point_between_points(A: VecSE2.VecSE2, B: VecSE2.VecSE2, Q: VecSE2.VecSE2,
tolerance: float = 0.01, # acceptable error in perpendicular component
max_iter: int = 50, # maximum number of iterations
):
'''
get closest perpendicular point between given points
CONDITIONS: a < b, either f(a) < 0 and f(b) > 0 or f(a) > 0 and f(b) < 0
:param A: start point
:param B: end point
:param Q: query point
:param tolerance: acceptable error in perpendicular component
:param max_iter: maximum number of iterations
:return: value which differs from a root of f(x)=0 by less than TOL
'''
# CONDITIONS: a < b, either f(a) < 0 and f(b) > 0 or f(a) > 0 and f(b) < 0
# OUTPUT: value which differs from a root of f(x)=0 by less than TOL
a = 0.0
b = 1.0
f_a = geom.inertial2body(Q, A).x
f_b = geom.inertial2body(Q, B).x
if sign(f_a) == sign(f_b): # both are wrong - use the old way
t = CurvePt.get_lerp_time_unclamped_3(A, B, Q)
t = CurvePt.clamp(t, 0.0, 1.0)
return t, VecSE2.lerp(A, B, t)
iter = 1
while iter <= max_iter:
c = (a+b)/2 # new midpoint
footpoint = VecSE2.lerp(A, B, c)
f_c = geom.inertial2body(Q, footpoint).x
if abs(f_c) < tolerance: # solution found
return c, footpoint
if sign(f_c) == sign(f_a):
a, f_a = c, f_c
else:
b = c
iter += 1
# Maximum number of iterations passed
# This will occur when we project with a point that is not actually in the range,
# and we converge towards one edge
if a == 0.0:
return 0.0, A
elif b == 1.0:
return 1.0, B
else:
warnings.warn("get_closest_perpendicular_point_between_points - should not happen")
c = (a+b)/2 # should not happen
return c, VecSE2.lerp(A, B, c)
[docs]def proj_1(posG: VecSE2.VecSE2, lane: Lane, roadway: Roadway, move_along_curves: bool = True):
'''
proj(posG::VecSE2, lane::Lane, roadway::Roadway; move_along_curves::Bool=true)
Return the RoadProjection for projecting posG onto the lane.
This will automatically project to the next or prev curve as appropriate.
if `move_along_curves` is false, will only project to lane.curve
'''
curveproj = CurvePt.proj(posG, lane.curve)
rettag = lane.tag
# print(curveproj.ind.i)
if has_next(lane) or has_prev(lane):
print(has_prev(lane), has_next(lane))
if curveproj.ind == CurvePt.CurveIndex(0, 0.0) and has_prev(lane):
pt_lo = prev_lane_point(lane, roadway)
pt_hi = lane.curve[0]
t = CurvePt.get_lerp_time_unclamped_2(pt_lo, pt_hi, posG)
if t <= 0.0 and move_along_curves:
return proj_1(posG, prev_lane(lane, roadway), roadway)
elif t < 1.0:
assert ((not move_along_curves) or 0.0 <= t < 1.0)
# t was computed assuming a constant angle
# this is not valid for the large distances and angle disparities between lanes
# thus we now use a bisection search to find the appropriate location
t, footpoint = get_closest_perpendicular_point_between_points(pt_lo.pos, pt_hi.pos, posG)
ind = CurvePt.CurveIndex(-1, t)
curveproj = CurvePt.get_curve_projection(posG, footpoint, ind)
elif curveproj.ind == CurvePt.curveindex_end(lane.curve) and has_next(lane):
pt_lo = lane.curve[-1]
pt_hi = next_lane_point(lane, roadway)
t = CurvePt.get_lerp_time_unclamped_2(pt_lo, pt_hi, posG)
if t >= 1.0 and move_along_curves:
return proj_1(posG, next_lane(lane, roadway), roadway)
elif t >= 0.0:
assert ((not move_along_curves) or 0.0 <= t < 1.0)
# t was computed assuming a constant angle
# this is not valid for the large distances and angle disparities between lanes
# thus we now use a bisection search to find the appropriate location
t, footpoint = get_closest_perpendicular_point_between_points(pt_lo.pos, pt_hi.pos, posG)
ind = CurvePt.CurveIndex(len(lane.curve) - 1, t)
curveproj = CurvePt.get_curve_projection(posG, footpoint, ind)
#print(rettag.segment, rettag.lane)
return RoadProjection(curveproj, rettag)
[docs]def proj_2(posG: VecSE2.VecSE2, roadway: Roadway):
'''
proj(posG::VecSE2, seg::RoadSegment, roadway::Roadway)
Return the RoadProjection for projecting posG onto the roadway.
Tries all of the lanes and gets the closest one
'''
best_dist2 = math.inf
best_proj = RoadProjection(CurvePt.CurveProjection(CurvePt.CurveIndex(-1, -1), None, None), NULL_LANETAG)
for seg in roadway.segments:
for lane in seg.lanes:
roadproj = proj_1(posG, lane, roadway, move_along_curves=False) # return RoadProjection
targetlane = roadway.get_by_tag(roadproj.tag) # return Lane
footpoint = targetlane.get_by_ind_roadway(roadproj.curveproj.ind, roadway)
vec = posG - footpoint.pos
dist2 = VecE2.normsquared(VecE2.VecE2(vec.x, vec.y))
# print(best_dist2, dist2, roadproj.curveproj.ind.i)
if dist2 < best_dist2:
best_dist2 = dist2
best_proj = roadproj
return best_proj
[docs]def n_lanes_right(lane: Lane, roadway: Roadway):
'''
:param lane: Lane object
:param roadway: Roadway object
:return: how many lane on the left of the given lane
'''
return lane.tag.lane
[docs]def n_lanes_left(lane: Lane, roadway: Roadway):
'''
:param lane: Lane object
:param roadway: Roadway object
:return: how many lane on the right of the given lane
'''
seg = roadway.get_by_id(lane.tag.segment)
return len(seg.lanes) - lane.tag.lane - 1
[docs]def is_between_segments_lo(ind: CurvePt.CurveIndex):
'''
check if the given CurveIndex is the first
:param ind: CurveIndex
:return: true if is the first, false otherwise
'''
return ind.i == 0
[docs]def is_between_segments_hi(ind: CurvePt.CurveIndex, curve: list):
'''
check if the given CurveIndex is the last
:param ind: CurveIndex
:return: true if is the last, false otherwise
'''
return ind.i == len(curve)
[docs]def is_in_exits(lane: Lane, target: LaneTag):
'''
check if the given LaneTag object is in exits list
:param lane: Lane object
:param target: LaneTag object
:return: true if in exits list, false otherwise
'''
for lc in lane.exits:
if lc.target.tag == target:
return True
return False
[docs]def is_in_entrances(lane: Lane, target: LaneTag):
'''
check if the given LaneTag object is in entrances list
:param lane: Lane object
:param target: LaneTag object
:return: true if in entrances list, false otherwise
'''
for lc in lane.entrances:
if lc.target.tag == target:
return True
return False