from envs.utils import dict_get, max_n_objects, fill_infos_cache, sample_multiple_trajdata_vehicle, load_ngsim_trajdatas,\
keep_vehicle_subset, select_multiple_trajdata_vehicle
from src.Record.frame import Frame
from src.Record.record import SceneRecord, get_scene
from src.Basic.Vehicle import Vehicle
from feature_extractor.utils import build_feature_extractor
from envs.action import AccelTurnrate, propagate
from src.Vec.VecE2 import norm
import copy
import numpy as np
from envs.utils import normalize
[docs]class MultiAgentAutoEnv:
def __init__(self, params: dict, trajdatas: list = None, trajinfos: list = None, roadways: list = None,
reclength: int = 5, delta_t: float = .1, primesteps: int = 50, H: int = 50,
n_veh: int = 20, remove_ngsim_veh: bool = False, reward: int = 0,
render_params: dict = {"zoom": 5., "viz_dir": "/tmp"}):
'''
:param params: env parameters
:param trajdatas: Vector{ListRecord}
:param trajinfos: Vector{Dict}
:param roadways: Vector{Roadway}
:param reclength: record length
:param delta_t: Δt::Float64
:param primesteps: timesteps to prime the scene
:param H: maximum horizon
:param n_veh: number of vehicles
:param remove_ngsim_veh:
:param reward: reward
:param render_params: rendering options
'''
param_keys = params.keys()
assert "trajectory_filepaths" in param_keys
# optionally overwrite defaults
reclength = dict_get(params, "reclength", reclength)
primesteps = dict_get(params, "primesteps", primesteps)
H = dict_get(params, "H", H)
n_veh = dict_get(params, "n_veh", n_veh)
remove_ngsim_veh = dict_get(params, "remove_ngsim_veh", remove_ngsim_veh)
reward = dict_get(params, "reward", reward)
for (k, v) in dict_get(params, "render_params", render_params).items():
render_params[k] = v
if trajdatas is None or trajinfos is None or roadways is None:
trajdatas, trajinfos, roadways = load_ngsim_trajdatas(
params["trajectory_filepaths"],
minlength=primesteps + H
)
# build components
scene_length = max_n_objects(trajdatas)
scene = Frame()
scene.init(scene_length)
rec = SceneRecord()
rec.init(reclength, delta_t, scene_length)
ext = build_feature_extractor(params)
infos_cache = fill_infos_cache(ext)
# features are stored in row-major order because they will be transferred
# to python; this is inefficient in julia, but don't change or it will
# break the python side of the interaction
features = [[0 for _ in range(len(ext))] for _ in range(n_veh)]
egoids = [-1 for _ in range(n_veh)]
ego_vehs = [None for _ in range(n_veh)]
self.trajdatas = trajdatas
self.trajinfos = trajinfos
self.roadways = roadways
self.roadway = None
self.scene = scene
self.rec = rec
self.ext = ext
self.egoids = egoids
self.ego_vehs = ego_vehs
self.traj_idx = 0
self.t = 0
self.h = 0
self.H = H
self.primesteps = primesteps
self.delta_t = delta_t
self.n_veh = n_veh
self.remove_ngsim_veh = remove_ngsim_veh
self.features = features
self.reward = reward
self.epid = 0
self.render_params = render_params
self.infos_cache = infos_cache
'''
Description:
Reset the environment. Note that this environment maintains the following
invariant attribute: at any given point, all vehicles currently being controlled
will end their episode at the same time. This simplifies the rest of the code
by enforcing synchronized restarts, but it does somewhat limit the sets of
possible vehicles that can simultaneously interact. With a small enough minimum
horizon (H <= 250 = 25 seconds) and number of vehicle (n_veh <= 100)
this should not be a problem. If you need to run with larger numbers then those
implement an environment with asynchronous resets.
Args:
- env: env to reset
- dones: bool vector indicating which indices have reached a terminal state
these must be either all true or all false
'''
[docs] def reset(self, dones: list = None, offset: int = None, random_seed: int = None,
egoid: int = None, traj_idx: int = None, start: int = None):
'''
:param dones: flag for indicating if agents have finished their episodes
:param offset: offset frame
:param random_seed: random seed
:param egoid: id for the ego vehicle
:param traj_idx: trajectory index
:param start: start time step
:return: features for the reset state
'''
if offset is None:
offset = self.H + self.primesteps
if dones is None:
dones = [True for _ in range(self.n_veh)]
# enforce environment invariant reset property
# (i.e., always either all true or all false)
assert len(set(dones)) == 1
# first == all at this point, so if first is false, skip the reset
if not dones[0]:
return
# sample multiple ego vehicles
# as stated above, these will all end at the same timestep
if traj_idx is None or egoid is None or start is None:
# print("===sampling===")
self.traj_idx, self.egoids, self.t, self.h = sample_multiple_trajdata_vehicle(
self.n_veh,
self.trajinfos,
offset,
rseed=random_seed
)
else:
# print("===selecting===")
self.traj_idx, self.egoids, self.t, self.h = select_multiple_trajdata_vehicle(
self.n_veh,
self.trajinfos,
offset,
egoid=egoid,
traj_idx=traj_idx,
period_start=start,
rseed=random_seed
)
# print("==========")
# print("env.egoid:", self.egoids)
# print("env.t:", self.t)
# print("env.h:", self.h)
# print("env.primesteps:", self.primesteps)
self.epid += 1
self.rec.empty()
self.scene.empty()
# prime
for t in range(self.t, (self.t + self.primesteps + 1)):
self.scene = get_scene(self.scene, self.trajdatas[self.traj_idx], t)
if self.remove_ngsim_veh:
self.scene = keep_vehicle_subset(self.scene, self.egoids)
self.rec.update(self.scene)
# set the ego vehicle
for (i, egoid) in enumerate(self.egoids):
vehidx = self.scene.findfirst(egoid)
self.ego_vehs[i] = self.scene[vehidx]
# set the roadway
self.roadway = self.roadways[self.traj_idx]
# self.t is the next timestep to load
self.t += self.primesteps + 1
# enforce a maximum horizon
self.h = min(self.h, self.t + self.H)
features = self.get_features()
# print("env reset features: ", features)
# print("reset frame: {}".format(self.t))
return features
def _step(self, action: list):
'''
:param action: action for the current step
:return: step information
'''
# make sure number of actions passed in equals number of vehicles
assert len(action) == self.n_veh
# print("==========================================")
ego_states = [None for _ in range(self.n_veh)]
for (i, ego_veh) in enumerate(self.ego_vehs):
# convert action into form
ego_action = AccelTurnrate(action[i][0], action[i][1])
# propagate the ego vehicle
# print(action[i][0], action[i][1])
ego_state = propagate(
ego_veh,
ego_action,
self.roadway,
self.delta_t
)
# update the ego_veh
self.ego_vehs[i] = Vehicle(ego_state, ego_veh.definition, ego_veh.id)
# load the actual scene, and insert the vehicle into it
# print("frame: {}, vehid: ".format(self.t), self.egoids)
self.scene = get_scene(self.scene, self.trajdatas[self.traj_idx], self.t)
if self.remove_ngsim_veh:
self.scene = keep_vehicle_subset(self.scene, self.egoids)
orig_vehs = [None for _ in range(self.n_veh)] # for infos purposes
for (i, egoid) in enumerate(self.egoids):
vehidx = self.scene.findfirst(egoid)
# track the original vehicle for validation / infos purposes
orig_vehs[i] = self.scene[vehidx]
# replace the original with the controlled vehicle
test = False
if test:
self.scene[vehidx] = self.ego_vehs[i]
# update rec with current scene
self.rec.update(self.scene)
# compute info about the step
step_infos = {
"rmse_pos": [],
"rmse_vel": [],
"rmse_t": [],
"x": [],
"y": [],
"v": [],
"s": [],
"phi": [],
"orig_x": [],
"orig_y": [],
"orig_v": [],
"orig_theta": [],
"orig_length": [],
"orig_width": [],
"lane_id": []
}
# print("orig x: {}, orig y: {}, orig v: {}".format(orig_vehs[0].state.posG.x, orig_vehs[0].state.posG.y,
# orig_vehs[0].state.v))
# print("predict x: {}, predict y: {}".format(self.ego_vehs[0].state.posG.x, self.ego_vehs[0].state.posG.y))
for i in range(self.n_veh):
step_infos["rmse_pos"].append(norm((orig_vehs[i].state.posG - self.ego_vehs[i].state.posG)))
step_infos["rmse_vel"].append(abs(orig_vehs[i].state.v - self.ego_vehs[i].state.v))
step_infos["rmse_t"].append(abs((orig_vehs[i].state.posF.t - self.ego_vehs[i].state.posF.t)))
step_infos["x"].append(self.ego_vehs[i].state.posG.x)
step_infos["y"].append(self.ego_vehs[i].state.posG.y)
step_infos["v"].append(self.ego_vehs[i].state.v)
step_infos["s"].append(self.ego_vehs[i].state.posF.s)
step_infos["phi"].append(self.ego_vehs[i].state.posF.phi)
step_infos["orig_x"].append(orig_vehs[i].state.posG.x)
step_infos["orig_y"].append(orig_vehs[i].state.posG.y)
step_infos["orig_v"].append(orig_vehs[i].state.v)
step_infos["orig_theta"].append(orig_vehs[i].state.posG.theta)
step_infos["orig_length"].append(orig_vehs[i].definition.length_)
step_infos["orig_width"].append(orig_vehs[i].definition.width_)
step_infos["lane_id"].append(orig_vehs[i].state.posF.roadind.tag.lane)
return step_infos
def _extract_rewards(self, infos: dict):
'''
:param infos: information dictionary
:return: rewards for each agent
'''
# rewards design
rewards = [0 for _ in range(self.n_veh)]
for i in range(self.n_veh):
reward_col = infos["is_colliding"][i] * self.reward
reward_off = infos["is_offroad"][i] * self.reward
reward_brake = infos["hard_brake"][i] * self.reward * 0.5 # braking hard is not as bad as a collision
rewards[i] = -max(reward_col, reward_off, reward_brake)
return np.array(rewards)
def _compute_feature_infos(self, features: list, accel_thresh_min: float=-2.0, accel_thresh: float = -3.0,
min_d_edge_thresh: float = 0.5, offroad_thresh: float = -0.1):
'''
:param features: features
:param accel_thresh_min: acceleration minimum threshold
:param accel_thresh: acceleration threshold
:param min_d_edge_thresh: minimum d edge threshold
:param offroad_thresh: offroad feature threshold
:return: feature information
'''
feature_infos = {
"is_colliding": [],
"is_offroad": [],
"hard_brake": []
}
for i in range(self.n_veh):
is_colliding = features[i][self.infos_cache["is_colliding_idx"]]
feature_infos["is_colliding"].append(is_colliding)
accel = features[i][self.infos_cache["accel_idx"]]
if accel <= accel_thresh_min:
normalized_accel = abs((accel - accel_thresh_min) / (accel_thresh - accel_thresh_min))
# linearly increase penalty up to accel_thresh
feature_infos["hard_brake"].append(min(1, normalized_accel))
else:
feature_infos["hard_brake"].append(0)
is_offroad = features[i][self.infos_cache["out_of_lane_idx"]]
if is_offroad < 1:
d_left = features[i][self.infos_cache["distance_road_edge_left_idx"]]
d_right = features[i][self.infos_cache["distance_road_edge_right_idx"]]
closest_d = min(d_left, d_right)
if closest_d <= min_d_edge_thresh: # meaning too close
is_offroad = abs((closest_d - min_d_edge_thresh) / (offroad_thresh - min_d_edge_thresh))
feature_infos["is_offroad"].append(is_offroad)
# print(feature_infos)
return feature_infos
[docs] def step(self, action: list):
'''
:param action: the action to take in the current step
:return: features for the next state, reward for taking the action, if current episode has terminated,
information
'''
# action is a list of actions, length equals to self.n_veh
step_infos = self._step(action)
# compute features and feature_infos
features = self.get_features()
feature_infos = self._compute_feature_infos(list(features))
# combine infos
infos = dict(**step_infos, **feature_infos)
# update env timestep to be the next scene to load
self.t += 1
# compute terminal
if self.t > self.h:
terminal = True
else:
terminal = False
terminal = [terminal for _ in range(self.n_veh)]
# vectorized sampler does not call reset on the environment
# but expects the environment to handle resetting, so do that here
# note: this mutates env.features in order to return the correct obs when resetting
# self.reset(terminal)
rewards = self._extract_rewards(infos)
# print(rewards)
return copy.deepcopy(features), rewards, terminal, infos
[docs] def get_features(self, normalize_feature=False, clip_std_multiple=10.0):
'''
:param normalize_feature: boolean indicator of whether normalize feature or not
:param clip_std_multiple: std clipper threshold
:return: extracted feature
'''
for (i, egoid) in enumerate(self.egoids):
veh_idx = self.scene.findfirst(egoid)
self.ext.pull_features(self.rec, self.roadway, veh_idx)
self.features[i] = copy.deepcopy(self.ext.features)
retval = np.array(copy.deepcopy(self.features))
if normalize_feature:
retval, obs_mean, obs_std = normalize(retval, clip_std_multiple)
return retval
[docs] def observation_space_spec(self):
'''
:return: observation space specification
'''
low = [0 for i in range(len(self.ext))]
high = [0 for i in range(len(self.ext))]
feature_infos = self.ext.feature_info()
for (i, fn) in enumerate(self.ext.feature_names()):
low[i] = feature_infos[fn]["low"]
high[i] = feature_infos[fn]["high"]
infos = {"high": high, "low": low}
return (len(self.ext),), "Box", infos
[docs] def action_space_spec(self):
'''
:return: action space specifications
'''
return (2,), "Box", {"high": [4., .15], "low": [-4., -.15]}
[docs] def obs_names(self):
'''
:return: name of observations
'''
return self.ext.feature_names()
[docs] def vectorized(self):
return True
[docs] def num_envs(self):
return self.n_veh
@property
def action_space(self):
_, _, action_space = self.action_space_spec()
return action_space