Source code for envs.multi_agent_env

from envs.utils import dict_get, max_n_objects, fill_infos_cache, sample_multiple_trajdata_vehicle, load_ngsim_trajdatas,\
    keep_vehicle_subset, select_multiple_trajdata_vehicle
from src.Record.frame import Frame
from src.Record.record import SceneRecord, get_scene
from src.Basic.Vehicle import Vehicle
from feature_extractor.utils import build_feature_extractor
from envs.action import AccelTurnrate, propagate
from src.Vec.VecE2 import norm
import copy
import numpy as np
from envs.utils import normalize


[docs]class MultiAgentAutoEnv: def __init__(self, params: dict, trajdatas: list = None, trajinfos: list = None, roadways: list = None, reclength: int = 5, delta_t: float = .1, primesteps: int = 50, H: int = 50, n_veh: int = 20, remove_ngsim_veh: bool = False, reward: int = 0, render_params: dict = {"zoom": 5., "viz_dir": "/tmp"}): ''' :param params: env parameters :param trajdatas: Vector{ListRecord} :param trajinfos: Vector{Dict} :param roadways: Vector{Roadway} :param reclength: record length :param delta_t: Δt::Float64 :param primesteps: timesteps to prime the scene :param H: maximum horizon :param n_veh: number of vehicles :param remove_ngsim_veh: :param reward: reward :param render_params: rendering options ''' param_keys = params.keys() assert "trajectory_filepaths" in param_keys # optionally overwrite defaults reclength = dict_get(params, "reclength", reclength) primesteps = dict_get(params, "primesteps", primesteps) H = dict_get(params, "H", H) n_veh = dict_get(params, "n_veh", n_veh) remove_ngsim_veh = dict_get(params, "remove_ngsim_veh", remove_ngsim_veh) reward = dict_get(params, "reward", reward) for (k, v) in dict_get(params, "render_params", render_params).items(): render_params[k] = v if trajdatas is None or trajinfos is None or roadways is None: trajdatas, trajinfos, roadways = load_ngsim_trajdatas( params["trajectory_filepaths"], minlength=primesteps + H ) # build components scene_length = max_n_objects(trajdatas) scene = Frame() scene.init(scene_length) rec = SceneRecord() rec.init(reclength, delta_t, scene_length) ext = build_feature_extractor(params) infos_cache = fill_infos_cache(ext) # features are stored in row-major order because they will be transferred # to python; this is inefficient in julia, but don't change or it will # break the python side of the interaction features = [[0 for _ in range(len(ext))] for _ in range(n_veh)] egoids = [-1 for _ in range(n_veh)] ego_vehs = [None for _ in range(n_veh)] self.trajdatas = trajdatas self.trajinfos = trajinfos self.roadways = roadways self.roadway = None self.scene = scene self.rec = rec self.ext = ext self.egoids = egoids self.ego_vehs = ego_vehs self.traj_idx = 0 self.t = 0 self.h = 0 self.H = H self.primesteps = primesteps self.delta_t = delta_t self.n_veh = n_veh self.remove_ngsim_veh = remove_ngsim_veh self.features = features self.reward = reward self.epid = 0 self.render_params = render_params self.infos_cache = infos_cache ''' Description: Reset the environment. Note that this environment maintains the following invariant attribute: at any given point, all vehicles currently being controlled will end their episode at the same time. This simplifies the rest of the code by enforcing synchronized restarts, but it does somewhat limit the sets of possible vehicles that can simultaneously interact. With a small enough minimum horizon (H <= 250 = 25 seconds) and number of vehicle (n_veh <= 100) this should not be a problem. If you need to run with larger numbers then those implement an environment with asynchronous resets. Args: - env: env to reset - dones: bool vector indicating which indices have reached a terminal state these must be either all true or all false '''
[docs] def reset(self, dones: list = None, offset: int = None, random_seed: int = None, egoid: int = None, traj_idx: int = None, start: int = None): ''' :param dones: flag for indicating if agents have finished their episodes :param offset: offset frame :param random_seed: random seed :param egoid: id for the ego vehicle :param traj_idx: trajectory index :param start: start time step :return: features for the reset state ''' if offset is None: offset = self.H + self.primesteps if dones is None: dones = [True for _ in range(self.n_veh)] # enforce environment invariant reset property # (i.e., always either all true or all false) assert len(set(dones)) == 1 # first == all at this point, so if first is false, skip the reset if not dones[0]: return # sample multiple ego vehicles # as stated above, these will all end at the same timestep if traj_idx is None or egoid is None or start is None: # print("===sampling===") self.traj_idx, self.egoids, self.t, self.h = sample_multiple_trajdata_vehicle( self.n_veh, self.trajinfos, offset, rseed=random_seed ) else: # print("===selecting===") self.traj_idx, self.egoids, self.t, self.h = select_multiple_trajdata_vehicle( self.n_veh, self.trajinfos, offset, egoid=egoid, traj_idx=traj_idx, period_start=start, rseed=random_seed ) # print("==========") # print("env.egoid:", self.egoids) # print("env.t:", self.t) # print("env.h:", self.h) # print("env.primesteps:", self.primesteps) self.epid += 1 self.rec.empty() self.scene.empty() # prime for t in range(self.t, (self.t + self.primesteps + 1)): self.scene = get_scene(self.scene, self.trajdatas[self.traj_idx], t) if self.remove_ngsim_veh: self.scene = keep_vehicle_subset(self.scene, self.egoids) self.rec.update(self.scene) # set the ego vehicle for (i, egoid) in enumerate(self.egoids): vehidx = self.scene.findfirst(egoid) self.ego_vehs[i] = self.scene[vehidx] # set the roadway self.roadway = self.roadways[self.traj_idx] # self.t is the next timestep to load self.t += self.primesteps + 1 # enforce a maximum horizon self.h = min(self.h, self.t + self.H) features = self.get_features() # print("env reset features: ", features) # print("reset frame: {}".format(self.t)) return features
def _step(self, action: list): ''' :param action: action for the current step :return: step information ''' # make sure number of actions passed in equals number of vehicles assert len(action) == self.n_veh # print("==========================================") ego_states = [None for _ in range(self.n_veh)] for (i, ego_veh) in enumerate(self.ego_vehs): # convert action into form ego_action = AccelTurnrate(action[i][0], action[i][1]) # propagate the ego vehicle # print(action[i][0], action[i][1]) ego_state = propagate( ego_veh, ego_action, self.roadway, self.delta_t ) # update the ego_veh self.ego_vehs[i] = Vehicle(ego_state, ego_veh.definition, ego_veh.id) # load the actual scene, and insert the vehicle into it # print("frame: {}, vehid: ".format(self.t), self.egoids) self.scene = get_scene(self.scene, self.trajdatas[self.traj_idx], self.t) if self.remove_ngsim_veh: self.scene = keep_vehicle_subset(self.scene, self.egoids) orig_vehs = [None for _ in range(self.n_veh)] # for infos purposes for (i, egoid) in enumerate(self.egoids): vehidx = self.scene.findfirst(egoid) # track the original vehicle for validation / infos purposes orig_vehs[i] = self.scene[vehidx] # replace the original with the controlled vehicle test = False if test: self.scene[vehidx] = self.ego_vehs[i] # update rec with current scene self.rec.update(self.scene) # compute info about the step step_infos = { "rmse_pos": [], "rmse_vel": [], "rmse_t": [], "x": [], "y": [], "v": [], "s": [], "phi": [], "orig_x": [], "orig_y": [], "orig_v": [], "orig_theta": [], "orig_length": [], "orig_width": [], "lane_id": [] } # print("orig x: {}, orig y: {}, orig v: {}".format(orig_vehs[0].state.posG.x, orig_vehs[0].state.posG.y, # orig_vehs[0].state.v)) # print("predict x: {}, predict y: {}".format(self.ego_vehs[0].state.posG.x, self.ego_vehs[0].state.posG.y)) for i in range(self.n_veh): step_infos["rmse_pos"].append(norm((orig_vehs[i].state.posG - self.ego_vehs[i].state.posG))) step_infos["rmse_vel"].append(abs(orig_vehs[i].state.v - self.ego_vehs[i].state.v)) step_infos["rmse_t"].append(abs((orig_vehs[i].state.posF.t - self.ego_vehs[i].state.posF.t))) step_infos["x"].append(self.ego_vehs[i].state.posG.x) step_infos["y"].append(self.ego_vehs[i].state.posG.y) step_infos["v"].append(self.ego_vehs[i].state.v) step_infos["s"].append(self.ego_vehs[i].state.posF.s) step_infos["phi"].append(self.ego_vehs[i].state.posF.phi) step_infos["orig_x"].append(orig_vehs[i].state.posG.x) step_infos["orig_y"].append(orig_vehs[i].state.posG.y) step_infos["orig_v"].append(orig_vehs[i].state.v) step_infos["orig_theta"].append(orig_vehs[i].state.posG.theta) step_infos["orig_length"].append(orig_vehs[i].definition.length_) step_infos["orig_width"].append(orig_vehs[i].definition.width_) step_infos["lane_id"].append(orig_vehs[i].state.posF.roadind.tag.lane) return step_infos def _extract_rewards(self, infos: dict): ''' :param infos: information dictionary :return: rewards for each agent ''' # rewards design rewards = [0 for _ in range(self.n_veh)] for i in range(self.n_veh): reward_col = infos["is_colliding"][i] * self.reward reward_off = infos["is_offroad"][i] * self.reward reward_brake = infos["hard_brake"][i] * self.reward * 0.5 # braking hard is not as bad as a collision rewards[i] = -max(reward_col, reward_off, reward_brake) return np.array(rewards) def _compute_feature_infos(self, features: list, accel_thresh_min: float=-2.0, accel_thresh: float = -3.0, min_d_edge_thresh: float = 0.5, offroad_thresh: float = -0.1): ''' :param features: features :param accel_thresh_min: acceleration minimum threshold :param accel_thresh: acceleration threshold :param min_d_edge_thresh: minimum d edge threshold :param offroad_thresh: offroad feature threshold :return: feature information ''' feature_infos = { "is_colliding": [], "is_offroad": [], "hard_brake": [] } for i in range(self.n_veh): is_colliding = features[i][self.infos_cache["is_colliding_idx"]] feature_infos["is_colliding"].append(is_colliding) accel = features[i][self.infos_cache["accel_idx"]] if accel <= accel_thresh_min: normalized_accel = abs((accel - accel_thresh_min) / (accel_thresh - accel_thresh_min)) # linearly increase penalty up to accel_thresh feature_infos["hard_brake"].append(min(1, normalized_accel)) else: feature_infos["hard_brake"].append(0) is_offroad = features[i][self.infos_cache["out_of_lane_idx"]] if is_offroad < 1: d_left = features[i][self.infos_cache["distance_road_edge_left_idx"]] d_right = features[i][self.infos_cache["distance_road_edge_right_idx"]] closest_d = min(d_left, d_right) if closest_d <= min_d_edge_thresh: # meaning too close is_offroad = abs((closest_d - min_d_edge_thresh) / (offroad_thresh - min_d_edge_thresh)) feature_infos["is_offroad"].append(is_offroad) # print(feature_infos) return feature_infos
[docs] def step(self, action: list): ''' :param action: the action to take in the current step :return: features for the next state, reward for taking the action, if current episode has terminated, information ''' # action is a list of actions, length equals to self.n_veh step_infos = self._step(action) # compute features and feature_infos features = self.get_features() feature_infos = self._compute_feature_infos(list(features)) # combine infos infos = dict(**step_infos, **feature_infos) # update env timestep to be the next scene to load self.t += 1 # compute terminal if self.t > self.h: terminal = True else: terminal = False terminal = [terminal for _ in range(self.n_veh)] # vectorized sampler does not call reset on the environment # but expects the environment to handle resetting, so do that here # note: this mutates env.features in order to return the correct obs when resetting # self.reset(terminal) rewards = self._extract_rewards(infos) # print(rewards) return copy.deepcopy(features), rewards, terminal, infos
[docs] def get_features(self, normalize_feature=False, clip_std_multiple=10.0): ''' :param normalize_feature: boolean indicator of whether normalize feature or not :param clip_std_multiple: std clipper threshold :return: extracted feature ''' for (i, egoid) in enumerate(self.egoids): veh_idx = self.scene.findfirst(egoid) self.ext.pull_features(self.rec, self.roadway, veh_idx) self.features[i] = copy.deepcopy(self.ext.features) retval = np.array(copy.deepcopy(self.features)) if normalize_feature: retval, obs_mean, obs_std = normalize(retval, clip_std_multiple) return retval
[docs] def observation_space_spec(self): ''' :return: observation space specification ''' low = [0 for i in range(len(self.ext))] high = [0 for i in range(len(self.ext))] feature_infos = self.ext.feature_info() for (i, fn) in enumerate(self.ext.feature_names()): low[i] = feature_infos[fn]["low"] high[i] = feature_infos[fn]["high"] infos = {"high": high, "low": low} return (len(self.ext),), "Box", infos
[docs] def action_space_spec(self): ''' :return: action space specifications ''' return (2,), "Box", {"high": [4., .15], "low": [-4., -.15]}
[docs] def obs_names(self): ''' :return: name of observations ''' return self.ext.feature_names()
[docs] def vectorized(self): return True
[docs] def num_envs(self): return self.n_veh
@property def action_space(self): _, _, action_space = self.action_space_spec() return action_space