Source code for algorithms.policy.GaussianMLPBaseline
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch
from algorithms.policy.MLP import MLP
[docs]class GaussianMLP(nn.Module):
def __init__(
self,
input_dim,
output_dim,
mean_network=None,
optimizer=None,
hidden_size=(32, 32),
step_size=0.01,
init_std=1.0,
normalize_inputs=True,
normalize_outputs=True,
subsample_factor=1.0,
max_itr=20
):
"""
:param input_dim: input dimension
:param output_dim: output dimension
:param mean_network: mean network
:param hidden_size: hidden layer size
:param step_size: learning rate
:param init_std: initial std
:param normalize_inputs: boolean var indicating whether we need to normalize input
:param normalize_outputs: boolean var indicating whether we need to normalize output
"""
super(GaussianMLP, self).__init__()
if mean_network is None:
mean_network = MLP(input_size=input_dim, hidden_size=hidden_size, output_size=output_dim)
self.input_dim = input_dim
self.output_dim = output_dim
self.mean_network = mean_network
self.lr = step_size
self.init_std = init_std
self.normalize_inputs = normalize_inputs
self.normalize_outputs = normalize_outputs
self.subsample_factor = subsample_factor
if optimizer is None:
optimizer = optim.RMSprop(mean_network.parameters(), lr=self.lr)
self.optimizer = optimizer
self.criterion = nn.MSELoss()
self.max_itr = max_itr
[docs] def forward(self, x):
if self.normalize_inputs:
x = (x - x.mean(dim=0))/(x.std(dim=0)+1e-4)
if torch.cuda.is_available():
x = x.cuda()
mean = self.mean_network(x)
if self.normalize_outputs:
mean = (mean - mean.mean(dim=0))/(mean.std(dim=0)+1e-4)
mean = mean.double()
return mean
[docs] def fit(self, xs, ys):
'''
:param xs: feature
:param ys: ground truth y
:return: no return, fit our model accordingly
'''
if torch.cuda.is_available():
xs = torch.tensor(xs).double().cuda()
ys = torch.tensor(ys).double().cuda()
else:
xs = torch.tensor(xs).double()
ys = torch.tensor(ys).double()
if self.subsample_factor < 1:
num_samples_tot = xs.shape[0]
idx = np.random.randint(0, num_samples_tot, int(num_samples_tot * self._subsample_factor))
xs, ys = xs[idx], ys[idx]
if self.normalize_outputs:
ys_mean = ys.mean(dim=0)
ys_std = ys.std(dim=0)
ys = (ys - ys_mean)/(ys_std+1e-4)
for itr in range(self.max_itr):
# print("fitting xs: ", xs)
output = self.forward(xs)
# print("output: ", output)
loss = self.criterion(output, ys)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
[docs] def predict(self, xs):
'''
:param xs: input feature
:return: predicted y given input feature using the model
'''
xs = torch.tensor(xs).double()
return self.forward(xs).cpu().detach().numpy()
[docs]class GaussianMLPBaseline(object):
"""
Baseline model to reduce variance
"""
def __init__(
self,
env_spec,
subsample_factor=1,
num_seq_inputs=1,
regressor_args=None
):
if regressor_args is None:
regressor_args = dict()
self._regressor = GaussianMLP(
input_dim=env_spec.observation_space.flat_dim,
output_dim=1,
**regressor_args
).double()
[docs] def fit(self, paths):
'''
:param paths: observations and rewards
:return: fitting the baseline model
'''
observations = np.concatenate([p["observations"] for p in paths])
returns = np.concatenate([p["returns"] for p in paths])
self._regressor.fit(observations, returns.reshape((-1, 1)))
[docs] def predict(self, path):
'''
:param path: giving observations
:return: predict reward given observations
'''
return self._regressor.predict(path["observations"]).flatten()
[docs] def parameters(self):
return self._regressor.parameters()
[docs] def set_cuda(self):
self._regressor.cuda()