Source code for envs.base

from envs.utils import dict_get, max_n_objects, fill_infos_cache, sample_trajdata_vehicle, load_ngsim_trajdatas
from src.Record.frame import Frame
from src.Record.record import SceneRecord, get_scene
from src.Basic.Vehicle import Vehicle
from feature_extractor.utils import build_feature_extractor
from envs.action import AccelTurnrate, propagate
from src.Vec.VecE2 import norm
import numpy as np
import copy


[docs]class AutoEnv: def __init__(self, params: dict, trajdatas: list = None, trajinfos: list = None, roadways: list = None, reclength: int = 5, delta_t: float = .1, primesteps: int = 50, H: int = 50, terminate_on_collision: bool = True, terminate_on_off_road: bool = True, render_params: dict = {"zoom": 5., "viz_dir": "/tmp"}): ''' :param params: :param trajdatas: Vector{ListRecord} :param trajinfos: Vector{Dict} :param roadways: Vector{Roadway} :param reclength: length of record :param delta_t: Δt::Float64 :param primesteps: Int, time steps to prime the scene :param H: maximum horizon :param terminate_on_collision: Bool :param terminate_on_off_road: Bool :param render_params: endering options ''' param_keys = params.keys() assert "trajectory_filepaths" in param_keys # optionally overwrite defaults reclength = dict_get(params, "reclength", reclength) primesteps = dict_get(params, "primesteps", primesteps) H = dict_get(params, "H", H) for (k, v) in dict_get(params, "render_params", render_params).items(): render_params[k] = v terminate_on_collision = dict_get(params, "terminate_on_collision", terminate_on_collision) terminate_on_off_road = dict_get(params, "terminate_on_off_road", terminate_on_off_road) if trajdatas is None or trajinfos is None or roadways is None: trajdatas, trajinfos, roadways = load_ngsim_trajdatas( params["trajectory_filepaths"], minlength=primesteps + H ) print("traj_data_len: ", len(trajdatas)) print("trajinfos: ", trajinfos) # build components scene_length = max_n_objects(trajdatas) scene = Frame() scene.init(scene_length) rec = SceneRecord() rec.init(reclength, delta_t, scene_length) ext = build_feature_extractor(params) infos_cache = fill_infos_cache(ext) self.trajdatas = trajdatas self.trajinfos = trajinfos self.roadways = roadways self.roadway = None self.scene = scene self.rec = rec self.ext = ext self.egoid = 0 self.ego_veh = None self.traj_idx = 0 self.t = 0 self.h = 0 self.H = H self.primesteps = primesteps self.delta_t = delta_t self.terminate_on_collision = terminate_on_collision self.terminate_on_off_road = terminate_on_off_road self.epid = 0 self.render_params = render_params self.infos_cache = infos_cache
[docs] def reset(self, offset: int = None, egoid: int = None, start: int = None, traj_idx: int = 0): ''' :param offset: offset frame number :param egoid: id for ego vehicle :param start: start time step :param traj_idx: trajectory corresponding index :return: features for the reset state ''' if offset is None: offset = self.H + self.primesteps self.traj_idx, self.egoid, self.t, self.h = sample_trajdata_vehicle( self.trajinfos, offset, traj_idx, egoid, start ) self.epid += 1 self.rec.empty() self.scene.empty() # prime for t in range(self.t, (self.t + self.primesteps + 1)): self.scene = get_scene(self.scene, self.trajdatas[self.traj_idx], t) # print("step {}:".format(t)) # for i in range(self.scene.n): # print(self.scene[i].id) self.rec.update(self.scene) # set the ego vehicle print("egoid: ", egoid) self.ego_veh = self.scene[self.scene.findfirst(self.egoid)] # set the roadway self.roadway = self.roadways[self.traj_idx] # self.t is the next timestep to load self.t += self.primesteps + 1 # enforce a maximum horizon self.h = min(self.h, self.t + self.H) return self.get_features()
def _step(self, action: list): ''' :param action: the action to take for the current step :return: step informations ''' # convert action into form # action[0] is `a::Float64` longitudinal acceleration [m/s^2] # action[1] is `ω::Float64` desired turn rate [rad/sec] action = action[0] # input should be a (n_agents,n_actions) size array ego_action = AccelTurnrate(action[0], action[1]) # propagate the ego vehicle ego_state = propagate( self.ego_veh, ego_action, self.roadway, self.delta_t ) # update the ego_veh self.ego_veh = Vehicle(ego_state, self.ego_veh.definition, self.ego_veh.id) # load the actual scene, and insert the vehicle into it self.scene = get_scene(self.scene, self.trajdatas[self.traj_idx], self.t) vehidx = self.scene.findfirst(self.egoid) orig_veh = self.scene[vehidx] # for infos purposes self.scene[vehidx] = self.ego_veh # update rec with current scene self.rec.update(self.scene) # compute info about the step step_infos = dict() step_infos["rmse_pos"] = norm((orig_veh.state.posG - self.ego_veh.state.posG)) step_infos["rmse_vel"] = abs((orig_veh.state.v - self.ego_veh.state.v)) step_infos["rmse_t"] = abs((orig_veh.state.posF.t - self.ego_veh.state.posF.t)) step_infos["x"] = self.ego_veh.state.posG.x step_infos["y"] = self.ego_veh.state.posG.y step_infos["orig_x"] = orig_veh.state.posG.x step_infos["orig_y"] = orig_veh.state.posG.y step_infos["s"] = self.ego_veh.state.posF.s step_infos["phi"] = self.ego_veh.state.posF.phi return step_infos def _extract_rewards(self, infos: dict): ''' :param infos: informations :return: extracted reward ''' # rewards design r = 0 if infos["is_colliding"] == 1: r -= 1 if infos["is_offroad"] == 1: r -= 1 return np.array([r]).reshape(1, -1) def _compute_feature_infos(self, features: list): ''' :param features: raw features :return: compute feature informations accordingly ''' features = features[0] # features size is (n_agent, n_features), so here is (1, 66) is_colliding = features[self.infos_cache["is_colliding_idx"]] markerdist_left = features[self.infos_cache["markerdist_left_idx"]] markerdist_right = features[self.infos_cache["markerdist_right_idx"]] is_offroad = features[self.infos_cache["out_of_lane_idx"]] return { "is_colliding": is_colliding, "markerdist_left": markerdist_left, "markerdist_right": markerdist_right, "is_offroad": is_offroad }
[docs] def step(self, action: list): ''' :param action: the action to take in the current step :return: features for the next state, reward for taking the action, if current episode has terminated, information ''' # action[0] is `a::Float64` longitudinal acceleration [m/s^2] # action[1] is `ω::Float64` desired turn rate [rad/sec] step_infos = self._step(action) # compute features and feature_infos features = self.get_features() feature_infos = self._compute_feature_infos(features) # combine infos infos = dict(**step_infos, **feature_infos) # update env timestep to be the next scene to load self.t += 1 # compute terminal if self.t >= self.h: terminal = True elif self.terminate_on_collision and infos["is_colliding"] == 1: terminal = True elif self.terminate_on_off_road and (abs(infos["markerdist_left"]) > 3 and abs(infos["markerdist_right"]) > 3): terminal = True else: terminal = False terminal = [terminal] reward = self._extract_rewards(infos) return features, reward, terminal, infos
[docs] def get_features(self): ''' :return: extracted feature for the current state ''' veh_idx = self.scene.findfirst(self.egoid) self.ext.pull_features( self.rec, self.roadway, veh_idx ) return np.array(copy.deepcopy(self.ext.features)).reshape(1, -1)
[docs] def observation_space_spec(self): ''' :return: observation space specifications ''' low = [0 for i in range(len(self.ext))] high = [0 for i in range(len(self.ext))] feature_infos = self.ext.feature_info() for (i, fn) in enumerate(self.ext.feature_names()): low[i] = feature_infos[fn]["low"] high[i] = feature_infos[fn]["high"] infos = {"high": high, "low": low} return (len(self.ext),), "Box", infos
[docs] def action_space_spec(self): ''' :return: action space specifications ''' return (2,), "Box", {"high": [4., .15], "low": [-4., -.15]}
[docs] def obs_names(self): ''' :return: feature names ''' return self.ext.feature_names()
@property def action_space(self): _, _, action_space = self.action_space_spec() return action_space