Source code for envs.build_env

import os
import tensorflow as tf
from envs.make import make_env, Env
from envs.utils import add_kwargs_to_reset

from rllab.envs.normalized_env import normalize as normalize_env

from hgail.policies.latent_sampler import UniformlyRandomLatentSampler
from hgail.policies.scheduling import ConstantIntervalScheduler
from hgail.envs.spec_wrapper_env import SpecWrapperEnv
from hgail.policies.categorical_latent_sampler import CategoricalLatentSampler
from hgail.samplers.hierarchy_sampler import HierarchySampler
from hgail.algos.hgail_impl import Level
from hgail.envs.vectorized_normalized_env import vectorized_normalized_env
from hgail.policies.gaussian_latent_var_gru_policy import GaussianLatentVarGRUPolicy
from hgail.policies.gaussian_latent_var_mlp_policy import GaussianLatentVarMLPPolicy
from hgail.baselines.gaussian_mlp_baseline import GaussianMLPBaseline

from sandbox.rocky.tf.spaces.discrete import Discrete
from sandbox.rocky.tf.algos.trpo import TRPO
from sandbox.rocky.tf.policies.gaussian_mlp_policy import GaussianMLPPolicy
from sandbox.rocky.tf.policies.gaussian_gru_policy import GaussianGRUPolicy
from sandbox.rocky.tf.envs.base import TfEnv

import hgail.misc.utils


[docs]def build_ngsim_env( args, exp_dir='/tmp', n_veh=1, alpha=0.001, vectorize=False, render_params=None, videoMaking=False): ''' building base ngsim env ''' basedir = os.path.expanduser('~/Autoenv/data') filepaths = [os.path.join(basedir, args.ngsim_filename)] # if render_params is None: # render_params = dict( # viz_dir=os.path.join(exp_dir, 'imitate/viz'), # zoom=5. # ) env_params = dict( trajectory_filepaths=filepaths, H=args.env_H, primesteps=args.env_primesteps, action_repeat=args.env_action_repeat, terminate_on_collision=False, terminate_on_off_road=False, # render_params=render_params, n_envs=args.n_envs, n_veh=n_veh, remove_ngsim_veh=args.remove_ngsim_veh, reward=args.env_reward ) # order matters here because multiagent is a subset of vectorized # i.e., if you want to run with multiagent = true, then vectorize must # also be true if args.env_multiagent: env_id = "MultiAgentAutoEnv" alpha = alpha * args.n_envs normalize_wrapper = vectorized_normalized_env else: env_id = "NGSIMEnv" normalize_wrapper = normalize_env # print(env_params) env = Env(env_id=env_id, env_params=env_params) low, high = env.action_space.low, env.action_space.high trajinfos = env.trajinfos env = TfEnv(normalize_wrapper(env, normalize_obs=True, obs_alpha=alpha)) # get low and high values for normalizing _real_ actions add_kwargs_to_reset(env) return env, trajinfos, low, high
[docs]def build_reward_handler(args, writer=None): ''' :param args: args for building reward handler :param writer: None :return: a reward handler ''' reward_handler = hgail.misc.utils.RewardHandler( use_env_rewards=args.reward_handler_use_env_rewards, max_epochs=args.reward_handler_max_epochs, # epoch at which final scales are used critic_final_scale=args.reward_handler_critic_final_scale, recognition_initial_scale=0., recognition_final_scale=args.reward_handler_recognition_final_scale, summary_writer=writer, normalize_rewards=True, critic_clip_low=-100, critic_clip_high=100, ) return reward_handler
[docs]def build_hierarchy(args, env, writer=None): ''' build hierarchical model ''' levels = [] latent_sampler = UniformlyRandomLatentSampler( name='base_latent_sampler', dim=args.latent_dim, scheduler=ConstantIntervalScheduler(k=args.env_H) ) for level_idx in [1, 0]: # wrap env in different spec depending on level if level_idx == 0: level_env = env else: level_env = SpecWrapperEnv( env, action_space=Discrete(args.latent_dim), observation_space=env.observation_space ) with tf.variable_scope('level_{}'.format(level_idx)): # recognition_model = build_recognition_model(args, level_env, writer) recognition_model = None if level_idx == 0: policy = build_policy(args, env, latent_sampler=latent_sampler) else: scheduler = ConstantIntervalScheduler(k=args.scheduler_k) policy = latent_sampler = CategoricalLatentSampler( scheduler=scheduler, name='latent_sampler', policy_name='latent_sampler_policy', dim=args.latent_dim, env_spec=level_env.spec, latent_sampler=latent_sampler, max_n_envs=args.n_envs ) baseline = build_baseline(args, level_env) if args.vectorize: force_batch_sampler = False if level_idx == 0: sampler_args = dict(n_envs=args.n_envs) else: sampler_args = None else: force_batch_sampler = True sampler_args = None sampler_cls = None if level_idx == 0 else HierarchySampler algo = TRPO( env=level_env, policy=policy, baseline=baseline, batch_size=args.batch_size, max_path_length=args.max_path_length, n_itr=args.n_itr, discount=args.discount, step_size=args.trpo_step_size, sampler_cls=sampler_cls, force_batch_sampler=force_batch_sampler, sampler_args=sampler_args, optimizer_args=dict( max_backtracks=50, debug_nan=True ) ) reward_handler = build_reward_handler(args, writer) level = Level( depth=level_idx, algo=algo, reward_handler=reward_handler, recognition_model=recognition_model, start_itr=0, end_itr=0 if level_idx == 0 else np.inf ) levels.append(level) # by convention the order of the levels should be increasing # but they must be built in the reverse order # so reverse the list before returning it return list(reversed(levels))
''' build policy functions '''
[docs]def build_policy(args, env, latent_sampler=None): ''' building policy ''' if args.use_infogail: if latent_sampler is None: latent_sampler = UniformlyRandomLatentSampler( scheduler=ConstantIntervalScheduler(k=args.scheduler_k), name='latent_sampler', dim=args.latent_dim ) if args.policy_recurrent: policy = GaussianLatentVarGRUPolicy( name="policy", latent_sampler=latent_sampler, env_spec=env.spec, hidden_dim=args.recurrent_hidden_dim, ) else: print("GaussianLatentVarMLPPolicy") policy = GaussianLatentVarMLPPolicy( name="policy", latent_sampler=latent_sampler, env_spec=env.spec, hidden_sizes=args.policy_mean_hidden_layer_dims, std_hidden_sizes=args.policy_std_hidden_layer_dims ) else: if args.policy_recurrent: print("GaussianGRUPolicy") policy = GaussianGRUPolicy( name="policy", env_spec=env.spec, hidden_dim=args.recurrent_hidden_dim, output_nonlinearity=None, learn_std=True ) else: print("GaussianMLPPolicy") policy = GaussianMLPPolicy( name="policy", env_spec=env.spec, hidden_sizes=args.policy_mean_hidden_layer_dims, std_hidden_sizes=args.policy_std_hidden_layer_dims, adaptive_std=True, output_nonlinearity=None, learn_std=True ) return policy
[docs]def build_baseline(args, env): ''' building baseline ''' return GaussianMLPBaseline(env_spec=env.spec)