Source code for algorithms.RL_Algorithm.utils
import numpy as np
[docs]class RewardHandler(object):
def __init__(
self,
use_env_rewards=True,
critic_clip_low=-np.inf,
critic_clip_high=np.inf,
critic_initial_scale=1.,
critic_final_scale=1.,
recognition_initial_scale=1,
recognition_final_scale=1.,
augmentation_scale=1.,
normalize_rewards=False,
alpha=.01,
max_epochs=10000,
summary_writer=None):
self.use_env_rewards = use_env_rewards
self.critic_clip_low = critic_clip_low
self.critic_clip_high = critic_clip_high
self.critic_initial_scale = critic_initial_scale
self.critic_final_scale = critic_final_scale
self.critic_scale = critic_initial_scale
self.recognition_initial_scale = recognition_initial_scale
self.recognition_final_scale = recognition_final_scale
self.recognition_scale = recognition_initial_scale
self.augmentation_scale = augmentation_scale
self.normalize_rewards = normalize_rewards
self.alpha = alpha
self.critic_reward_mean = 0.
self.critic_reward_var = 1.
self.recog_reward_mean = 0.
self.recog_reward_var = 1.
self.step = 0
self.max_epochs = max_epochs
self.summary_writer = summary_writer
def _update_reward_estimate(self, rewards, reward_type):
# unpack
a = self.alpha
mean = self.critic_reward_mean if reward_type == 'critic' else self.recog_reward_mean
var = self.critic_reward_var if reward_type == 'critic' else self.recog_reward_var
# update the reward mean using the mean of the rewards
new_mean = (1 - a) * mean + a * np.mean(rewards)
# update the variance with the mean of the individual variances
new_var = (1 - a) * var + a * np.mean((rewards - mean) ** 2)
# update class members
if reward_type == 'critic':
self.critic_reward_mean = new_mean
self.critic_reward_var = new_var
else:
self.recog_reward_mean = new_mean
self.recog_reward_var = new_var
def _normalize_rewards(self, rewards, reward_type):
self._update_reward_estimate(rewards, reward_type)
var = self.critic_reward_var if reward_type == 'critic' else self.recog_reward_var
return rewards / (np.sqrt(var) + 1e-8)
def _update_scales(self):
self.step += 1
frac = np.minimum(self.step / self.max_epochs, 1)
self.critic_scale = self.critic_initial_scale \
+ frac * (self.critic_final_scale - self.critic_initial_scale)
self.recognition_scale = self.recognition_initial_scale \
+ frac * (self.recognition_final_scale - self.recognition_initial_scale)
[docs] def merge(
self,
paths,
critic_rewards=None,
recognition_rewards=None):
"""
Add critic and recognition rewards to path rewards based on settings
Args:
paths: list of dictionaries as described in process_samples
critic_rewards: list of numpy arrays of equal shape as corresponding path['rewards']
recognition_rewards: same as critic rewards
"""
# update relative reward scales
self._update_scales()
# combine the different rewards
for (i, path) in enumerate(paths):
shape = np.shape(path['rewards'])
# env rewards
if self.use_env_rewards:
path['rewards'] = np.float32(path['rewards'])
else:
path['rewards'] = np.zeros(shape, dtype=np.float32)
# critic rewards
if critic_rewards is not None:
critic_rewards[i] = np.clip(critic_rewards[i], self.critic_clip_low, self.critic_clip_high)
if self.normalize_rewards:
critic_rewards[i] = self._normalize_rewards(
critic_rewards[i], reward_type='critic')
path['rewards'] += self.critic_scale * np.reshape(critic_rewards[i], shape)
# recognition rewards
if recognition_rewards is not None:
if self.normalize_rewards:
recognition_rewards[i] = self._normalize_rewards(
recognition_rewards[i], reward_type='recognition')
path['rewards'] += self.recognition_scale * np.reshape(recognition_rewards[i], shape)
return paths