envs package

Submodules

envs.action module

class envs.action.AccelTurnrate(a: float, omega: float)[source]

Bases: object

envs.action.propagate(veh: src.Basic.Vehicle.Vehicle, action: envs.action.AccelTurnrate, roadway: src.Roadway.roadway.Roadway, delta_t: float, n_integration_steps: int = 4)[source]

propagate the vehicle state according to the specific action :param veh: the vehicle that will be propagated :param action: the action (acceleration, turning rate) :param roadway: the roadway information :param delta_t: how long our action will last :param n_integration_steps: the integration interval number :return: the propagated vehicle state

envs.base module

class envs.base.AutoEnv(params: dict, trajdatas: list = None, trajinfos: list = None, roadways: list = None, reclength: int = 5, delta_t: float = 0.1, primesteps: int = 50, H: int = 50, terminate_on_collision: bool = True, terminate_on_off_road: bool = True, render_params: dict = {'viz_dir': '/tmp', 'zoom': 5.0})[source]

Bases: object

property action_space
action_space_spec()[source]
Returns

action space specifications

get_features()[source]
Returns

extracted feature for the current state

obs_names()[source]
Returns

feature names

observation_space_spec()[source]
Returns

observation space specifications

reset(offset: int = None, egoid: int = None, start: int = None, traj_idx: int = 0)[source]
Parameters
  • offset – offset frame number

  • egoid – id for ego vehicle

  • start – start time step

  • traj_idx – trajectory corresponding index

Returns

features for the reset state

step(action: list)[source]
Parameters

action – the action to take in the current step

Returns

features for the next state, reward for taking the action, if current episode has terminated, information

envs.build_env module

envs.build_env.build_baseline(args, env)[source]

building baseline

envs.build_env.build_hierarchy(args, env, writer=None)[source]

build hierarchical model

envs.build_env.build_ngsim_env(args, exp_dir='/tmp', n_veh=1, alpha=0.001, vectorize=False, render_params=None, videoMaking=False)[source]

building base ngsim env

envs.build_env.build_policy(args, env, latent_sampler=None)[source]

building policy

envs.build_env.build_reward_handler(args, writer=None)[source]
Parameters
  • args – args for building reward handler

  • writer – None

Returns

a reward handler

envs.hyperparams module

default hyperparameters for training these are build as args to allow for command line options these args are also saved along with parameters during training to allow for rebuilding everything with the same settings

envs.hyperparams.load_args(args_filepath)[source]

This function enables backward-compatible usage of saved args files by filling in missing values with default values.

envs.hyperparams.parse_args(arglist=None)[source]

envs.make module

class envs.make.Env(env_id, env_params)[source]

Bases: object

Basic Env Wrapper

property action_space
property num_envs
obs_names()[source]
property observation_space
render(*args, **kwargs)[source]
reset(dones=None, **kwargs)[source]
step(action)[source]
vec_env_executor(*args, **kwargs)[source]
property vectorized
envs.make.make_env(env_id: str, env_params: dict)[source]
Parameters
  • env_id – env id

  • env_params – env parameters

Returns

single agent env or multi agent env object

envs.multi_agent_env module

class envs.multi_agent_env.MultiAgentAutoEnv(params: dict, trajdatas: list = None, trajinfos: list = None, roadways: list = None, reclength: int = 5, delta_t: float = 0.1, primesteps: int = 50, H: int = 50, n_veh: int = 20, remove_ngsim_veh: bool = False, reward: int = 0, render_params: dict = {'viz_dir': '/tmp', 'zoom': 5.0})[source]

Bases: object

property action_space
action_space_spec()[source]
Returns

action space specifications

get_features(normalize_feature=False, clip_std_multiple=10.0)[source]
Parameters
  • normalize_feature – boolean indicator of whether normalize feature or not

  • clip_std_multiple – std clipper threshold

Returns

extracted feature

num_envs()[source]
obs_names()[source]
Returns

name of observations

observation_space_spec()[source]
Returns

observation space specification

reset(dones: list = None, offset: int = None, random_seed: int = None, egoid: int = None, traj_idx: int = None, start: int = None)[source]
Parameters
  • dones – flag for indicating if agents have finished their episodes

  • offset – offset frame

  • random_seed – random seed

  • egoid – id for the ego vehicle

  • traj_idx – trajectory index

  • start – start time step

Returns

features for the reset state

step(action: list)[source]
Parameters

action – the action to take in the current step

Returns

features for the next state, reward for taking the action, if current episode has terminated, information

vectorized()[source]

envs.utils module

envs.utils.add_kwargs_to_reset(env)[source]
envs.utils.build_space(shape, space_type, info={})[source]
Parameters
  • shape – shape

  • space_type – space type

  • info – information

Returns

space box

envs.utils.cal_agent_rmse(error, k, verbose=False, lookahead_span=10)[source]
envs.utils.cal_avg(error: list, type: str = None)[source]
envs.utils.cal_lookahead(error: list, predict_span: int)[source]
envs.utils.cal_lookahead_rmse(error, j, verbose=False)[source]
envs.utils.cal_m_stability(error, h=50, T=150, e=0.1, verbose=False)[source]
envs.utils.cal_overall_rmse(error, verbose=False)[source]
envs.utils.cal_step_rmse(error, i, verbose=False)[source]
envs.utils.compute_lengths(arr)[source]
Parameters

arr – array

Returns

the length of the minimum non-zero list in the array, start index, end index

envs.utils.create_index_file(filepaths, minlength: int = 100)[source]
Parameters
  • filepaths – index file path

  • minlength – minimum length for indexes

Returns

no return, create the index file

envs.utils.dict_get(d: dict, key, default)[source]
envs.utils.fill_infos_cache(ext: feature_extractor.feature_extractor.MultiFeatureExtractor)[source]
envs.utils.index_ngsim_trajectory(filepath: str, minlength: int = 100, offset: int = 0, verbose: int = 1)[source]
Parameters
  • filepath – file path to load trajectory

  • minlength – minimum length for trajectory

  • offset – offset frame

  • verbose

Returns

indexed trajectory data

envs.utils.keep_vehicle_subset(scene: src.Record.frame.Frame, ids: list)[source]
envs.utils.load_data(filepath, act_keys=['accel', 'turn_rate_global'], ngsim_filename='trajdata_holo_trajectories.txt', debug_size=None, min_length=50, normalize_data=True, shuffle=False, act_low=-1, act_high=1, clip_std_multiple=inf)[source]
Parameters
  • filepath

  • act_keys

  • ngsim_filename

  • debug_size

  • min_length

  • normalize_data

  • shuffle

  • act_low

  • act_high

  • clip_std_multiple

Returns

envs.utils.load_ngsim_trajdatas(filepaths, minlength: int = 100)[source]
Parameters
  • filepaths – file path to load trajectory data

  • minlength – minimum length for trajectory

Returns

laoded data

envs.utils.load_x_feature_names(filepath, ngsim_filename)[source]

load feature from file

envs.utils.max_n_objects(trajdatas: list)[source]
envs.utils.maybe_mkdir(dirpath)[source]
envs.utils.normalize(x, clip_std_multiple=inf)[source]

apply normalization

envs.utils.normalize_env_reset_with_kwargs(self, **kwargs)[source]
envs.utils.normalize_range(x, low, high)[source]

apply normalize with clipped range

envs.utils.partition_list(lst, n)[source]
envs.utils.print_error(error: dict)[source]
envs.utils.random_sample_from_set_without_replacement(s: set, n)[source]
Parameters
  • s – set to sample from

  • n – the number of samples

Returns

sampled result

envs.utils.sample_multiple_trajdata_vehicle(n_veh: int, trajinfos, offset: int, max_resamples: int = 100, egoid: int = None, traj_idx: int = None, verbose: bool = True, rseed: int = None, multiple: bool = False)[source]
Parameters
  • n_veh – number of vehicles

  • trajinfos – trajectory information

  • offset – offset frame

  • max_resamples – maximum re-sample number

  • egoid – id for the ego vehicle

  • traj_idx – trajectory index

  • verbose – print information or not

  • rseed – random seed

  • multiple – if sample multiple vehicles simultaneously

Returns

trajectory index, id for each vehicle, start frame, end frame

envs.utils.sample_trajdata_vehicle(trajinfos, offset: int = 0, traj_idx: int = None, egoid: int = None, start: int = None)[source]
envs.utils.save_error(error: list)[source]
envs.utils.select_multiple_trajdata_vehicle(n_veh: int, trajinfos, offset: int, max_resamples: int = 100, egoid: int = None, traj_idx: int = None, verbose: bool = True, period_start: int = 0, period_end: int = 100000, rseed: int = None, multiple: bool = False)[source]
Parameters
  • n_veh – number of vehicles

  • trajinfos – trajectory information

  • offset – offset frame

  • egoid – id for the ego vehicle

  • traj_idx – trajectory index

  • verbose – print information or not

  • rseed – random seed

  • multiple – if sample multiple vehicles simultaneously

Returns

trajectory index, id for each vehicle, start frame, end frame

envs.utils.str2bool(v)[source]
envs.utils.write_trajectories(filepath, trajs)[source]

Module contents