from envs.utils import dict_get, max_n_objects, fill_infos_cache, sample_trajdata_vehicle, load_ngsim_trajdatas
from src.Record.frame import Frame
from src.Record.record import SceneRecord, get_scene
from src.Basic.Vehicle import Vehicle
from feature_extractor.utils import build_feature_extractor
from envs.action import AccelTurnrate, propagate
from src.Vec.VecE2 import norm
import numpy as np
import copy
[docs]class AutoEnv:
def __init__(self, params: dict, trajdatas: list = None, trajinfos: list = None, roadways: list = None,
reclength: int = 5, delta_t: float = .1, primesteps: int = 50, H: int = 50,
terminate_on_collision: bool = True, terminate_on_off_road: bool = True,
render_params: dict = {"zoom": 5., "viz_dir": "/tmp"}):
'''
:param params:
:param trajdatas: Vector{ListRecord}
:param trajinfos: Vector{Dict}
:param roadways: Vector{Roadway}
:param reclength: length of record
:param delta_t: Δt::Float64
:param primesteps: Int, time steps to prime the scene
:param H: maximum horizon
:param terminate_on_collision: Bool
:param terminate_on_off_road: Bool
:param render_params: endering options
'''
param_keys = params.keys()
assert "trajectory_filepaths" in param_keys
# optionally overwrite defaults
reclength = dict_get(params, "reclength", reclength)
primesteps = dict_get(params, "primesteps", primesteps)
H = dict_get(params, "H", H)
for (k, v) in dict_get(params, "render_params", render_params).items():
render_params[k] = v
terminate_on_collision = dict_get(params, "terminate_on_collision", terminate_on_collision)
terminate_on_off_road = dict_get(params, "terminate_on_off_road", terminate_on_off_road)
if trajdatas is None or trajinfos is None or roadways is None:
trajdatas, trajinfos, roadways = load_ngsim_trajdatas(
params["trajectory_filepaths"],
minlength=primesteps + H
)
print("traj_data_len: ", len(trajdatas))
print("trajinfos: ", trajinfos)
# build components
scene_length = max_n_objects(trajdatas)
scene = Frame()
scene.init(scene_length)
rec = SceneRecord()
rec.init(reclength, delta_t, scene_length)
ext = build_feature_extractor(params)
infos_cache = fill_infos_cache(ext)
self.trajdatas = trajdatas
self.trajinfos = trajinfos
self.roadways = roadways
self.roadway = None
self.scene = scene
self.rec = rec
self.ext = ext
self.egoid = 0
self.ego_veh = None
self.traj_idx = 0
self.t = 0
self.h = 0
self.H = H
self.primesteps = primesteps
self.delta_t = delta_t
self.terminate_on_collision = terminate_on_collision
self.terminate_on_off_road = terminate_on_off_road
self.epid = 0
self.render_params = render_params
self.infos_cache = infos_cache
[docs] def reset(self, offset: int = None, egoid: int = None, start: int = None, traj_idx: int = 0):
'''
:param offset: offset frame number
:param egoid: id for ego vehicle
:param start: start time step
:param traj_idx: trajectory corresponding index
:return: features for the reset state
'''
if offset is None:
offset = self.H + self.primesteps
self.traj_idx, self.egoid, self.t, self.h = sample_trajdata_vehicle(
self.trajinfos,
offset,
traj_idx,
egoid,
start
)
self.epid += 1
self.rec.empty()
self.scene.empty()
# prime
for t in range(self.t, (self.t + self.primesteps + 1)):
self.scene = get_scene(self.scene, self.trajdatas[self.traj_idx], t)
# print("step {}:".format(t))
# for i in range(self.scene.n):
# print(self.scene[i].id)
self.rec.update(self.scene)
# set the ego vehicle
print("egoid: ", egoid)
self.ego_veh = self.scene[self.scene.findfirst(self.egoid)]
# set the roadway
self.roadway = self.roadways[self.traj_idx]
# self.t is the next timestep to load
self.t += self.primesteps + 1
# enforce a maximum horizon
self.h = min(self.h, self.t + self.H)
return self.get_features()
def _step(self, action: list):
'''
:param action: the action to take for the current step
:return: step informations
'''
# convert action into form
# action[0] is `a::Float64` longitudinal acceleration [m/s^2]
# action[1] is `ω::Float64` desired turn rate [rad/sec]
action = action[0] # input should be a (n_agents,n_actions) size array
ego_action = AccelTurnrate(action[0], action[1])
# propagate the ego vehicle
ego_state = propagate(
self.ego_veh,
ego_action,
self.roadway,
self.delta_t
)
# update the ego_veh
self.ego_veh = Vehicle(ego_state, self.ego_veh.definition, self.ego_veh.id)
# load the actual scene, and insert the vehicle into it
self.scene = get_scene(self.scene, self.trajdatas[self.traj_idx], self.t)
vehidx = self.scene.findfirst(self.egoid)
orig_veh = self.scene[vehidx] # for infos purposes
self.scene[vehidx] = self.ego_veh
# update rec with current scene
self.rec.update(self.scene)
# compute info about the step
step_infos = dict()
step_infos["rmse_pos"] = norm((orig_veh.state.posG - self.ego_veh.state.posG))
step_infos["rmse_vel"] = abs((orig_veh.state.v - self.ego_veh.state.v))
step_infos["rmse_t"] = abs((orig_veh.state.posF.t - self.ego_veh.state.posF.t))
step_infos["x"] = self.ego_veh.state.posG.x
step_infos["y"] = self.ego_veh.state.posG.y
step_infos["orig_x"] = orig_veh.state.posG.x
step_infos["orig_y"] = orig_veh.state.posG.y
step_infos["s"] = self.ego_veh.state.posF.s
step_infos["phi"] = self.ego_veh.state.posF.phi
return step_infos
def _extract_rewards(self, infos: dict):
'''
:param infos: informations
:return: extracted reward
'''
# rewards design
r = 0
if infos["is_colliding"] == 1:
r -= 1
if infos["is_offroad"] == 1:
r -= 1
return np.array([r]).reshape(1, -1)
def _compute_feature_infos(self, features: list):
'''
:param features: raw features
:return: compute feature informations accordingly
'''
features = features[0] # features size is (n_agent, n_features), so here is (1, 66)
is_colliding = features[self.infos_cache["is_colliding_idx"]]
markerdist_left = features[self.infos_cache["markerdist_left_idx"]]
markerdist_right = features[self.infos_cache["markerdist_right_idx"]]
is_offroad = features[self.infos_cache["out_of_lane_idx"]]
return {
"is_colliding": is_colliding,
"markerdist_left": markerdist_left,
"markerdist_right": markerdist_right,
"is_offroad": is_offroad
}
[docs] def step(self, action: list):
'''
:param action: the action to take in the current step
:return: features for the next state, reward for taking the action, if current episode has terminated,
information
'''
# action[0] is `a::Float64` longitudinal acceleration [m/s^2]
# action[1] is `ω::Float64` desired turn rate [rad/sec]
step_infos = self._step(action)
# compute features and feature_infos
features = self.get_features()
feature_infos = self._compute_feature_infos(features)
# combine infos
infos = dict(**step_infos, **feature_infos)
# update env timestep to be the next scene to load
self.t += 1
# compute terminal
if self.t >= self.h:
terminal = True
elif self.terminate_on_collision and infos["is_colliding"] == 1:
terminal = True
elif self.terminate_on_off_road and (abs(infos["markerdist_left"]) > 3 and abs(infos["markerdist_right"]) > 3):
terminal = True
else:
terminal = False
terminal = [terminal]
reward = self._extract_rewards(infos)
return features, reward, terminal, infos
[docs] def get_features(self):
'''
:return: extracted feature for the current state
'''
veh_idx = self.scene.findfirst(self.egoid)
self.ext.pull_features(
self.rec,
self.roadway,
veh_idx
)
return np.array(copy.deepcopy(self.ext.features)).reshape(1, -1)
[docs] def observation_space_spec(self):
'''
:return: observation space specifications
'''
low = [0 for i in range(len(self.ext))]
high = [0 for i in range(len(self.ext))]
feature_infos = self.ext.feature_info()
for (i, fn) in enumerate(self.ext.feature_names()):
low[i] = feature_infos[fn]["low"]
high[i] = feature_infos[fn]["high"]
infos = {"high": high, "low": low}
return (len(self.ext),), "Box", infos
[docs] def action_space_spec(self):
'''
:return: action space specifications
'''
return (2,), "Box", {"high": [4., .15], "low": [-4., -.15]}
[docs] def obs_names(self):
'''
:return: feature names
'''
return self.ext.feature_names()
@property
def action_space(self):
_, _, action_space = self.action_space_spec()
return action_space