一、什么是多重离散动作空间?
多重离散动作空间是指一个环境中具有多个离散动作空间的情况。在传统的强化学习中,通常假设环境的动作空间是离散的,即代理可以选择的动作是有限且离散的。然而,在某些情况下,一个环境可能具有多个离散动作空间,每个动作空间对应于不同的维度。
举个例子,假设你有一个机器人控制问题,机器人可以在三个维度上采取动作:X轴方向、Y轴方向和Z轴方向。每个维度上的动作空间都是离散的,例如在X轴上可以选择向左、向右或不动,在Y轴和Z轴上也有类似的选择。这样,整个动作空间就被分解为三个离散动作空间的笛卡尔积。
多重离散动作空间可以用于描述具有多个维度的离散动作选择问题,每个维度都有自己的离散动作空间。在这种情况下,代理需要为每个维度选择一个动作,将这些动作组合成一个多维动作向量,然后执行这个向量作为一个整体的动作。
在强化学习中,针对多重离散动作空间的问题,我们可以使用适当的算法和技术来解决,例如深度Q网络(DQN)的多重离散版本或者使用策略梯度方法。这些算法可以允许代理在多个维度上做出独立的动作选择,并通过学习来优化整体的策略。
示例代码分析
第一个例子是使用OpenAI Gym库来创建离散动作空间的环境,并演示如何创建多重离散动作空间。
import gym
from gym.spaces import Discrete, MultiDiscrete
"""
e.g. Nintendo Game Controller
- Can be conceptualized as 3 discrete action spaces:
1) Arrow Keys: Discrete 5 - NOOP[0], UP[1], RIGHT[2], DOWN[3], LEFT[4] - params: min: 0, max: 4
2) Button A: Discrete 2 - NOOP[0], Pressed[1] - params: min: 0, max: 1
3) Button B: Discrete 2 - NOOP[0], Pressed[1] - params: min: 0, max: 1
"""
# discrete action space env
env = gym.make('Pong-v4')
assert env.action_space == Discrete(6)
# multi discrete action space
md_space = MultiDiscrete([2, 3]) # 6 = 2 * 3
示例中的环境为游戏《乓》(英语:Pong)是雅达利在1972年11月29日推出的一款投币式街机游戏。《乓》是一款乒乓球游戏,其英文名称“Pong”来自乒乓球击打后所发出的声音。《乓》很多时候也有认为是电子游戏历史上第一个街机电子游戏。在此游戏中,玩家的目的就是在模拟乒乓球比赛中夺取高分以击败电脑玩家。此游戏的开发者为艾伦·奥尔康。
这段代码使用了OpenAI Gym库来创建游戏环境,并定义了离散动作空间和多重离散动作空间。
import gym
from gym.spaces import Discrete, MultiDiscrete
首先,代码导入了gym
模块和Discrete
、MultiDiscrete
类。gym
是一个开源的强化学习库,提供了许多经典的强化学习环境和相关工具函数。Discrete
和MultiDiscrete
是Gym中定义离散动作空间的类。
接下来,代码给出了一个示例,将任天堂游戏控制器作为说明。注释中提到,游戏控制器可以被概念化为3个离散的动作空间。
1) Arrow Keys(方向键):离散5个动作,包括NOOP[0],UP[1],RIGHT[2],DOWN[3],LEFT[4]。这个动作空间的参数范围是0到4,最小值为0,最大值为4。
2) Button A(A按钮):离散2个动作,包括NOOP[0],Pressed[1]。这个动作空间的参数范围是0到1,最小值为0,最大值为1。
3) Button B(B按钮):离散2个动作,包括NOOP[0],Pressed[1]。这个动作空间的参数范围是0到1,最小值为0,最大值为1。
env = gym.make('Pong-v4')
然后,代码创建了一个离散动作空间的环境,使用的是名为’Pong-v4’的游戏环境。这个游戏环境是OpenAI Gym提供的一个版本,代表了Pong游戏。通过gym.make
函数创建环境对象,并将其分配给名为env
的变量。
assert env.action_space == Discrete(6)
接着,代码使用断言语句来验证环境的动作空间是否为离散动作空间,并检查动作空间的范围是否为0到5(共6个动作)。这是因为’Pong-v4’游戏环境的动作空间是一个离散动作空间,有6个动作可供选择。
md_space = MultiDiscrete([2, 3]) # 6 = 2 * 3
最后,代码创建了一个多重离散动作空间md_space
,使用MultiDiscrete
类来定义动作空间的维度和范围。在这个示例中,md_space
是一个二维的动作空间,第一个维度有2个动作,第二个维度有3个动作,总共有6个动作。
MultiDiscreteEnv类
class MultiDiscreteEnv(gym.Wrapper):
"""Map the actions from the factorized action spaces to the original single action space.
:param gym.Env env: the environment to wrap.
:param list action_shape: dims of the the factorized action spaces.
"""
def __init__(self, env, action_shape):
super().__init__(env)
self.action_shape = np.flip(np.cumprod(np.flip(np.array(action_shape))))
def step(self, action):
"""
Overview:
Step the environment with the given factorized actions.
Arguments:
- action (:obj:`list`): a list contains the action output of each discrete dimension, e.g.: [1, 1] means 1 * 3 + 1 = 4 for a factorized action 2 * 3 = 6
"""
action = action[0] * self.action_shape[1] + action[1]
obs, reward, done, info = self.env.step(action)
return obs, reward, done, info
这段代码定义了一个名为MultiDiscreteEnv的类,该类是gym.Wrapper的子类,用于将分解的动作空间映射到原始的单一动作空间。
构造函数init接受两个参数:env是要包装的环境对象,action_shape是分解动作空间的维度。
在init方法中,首先调用了super().init(env)来调用父类gym.Wrapper的构造函数进行初始化。然后,使用np.flip和np.cumprod函数对action_shape进行处理,计算了一个用于映射分解动作到原始动作的形状数组self.action_shape。
step方法用于执行环境的一步操作。它接受一个参数action,该参数是一个列表,包含每个离散维度的动作输出。例如,[1, 1]表示对于一个分解动作2 * 3 = 6,其中第一个维度的动作是1,第二个维度的动作是1。
在step方法中,首先计算了一个合并的动作值action,通过将第一个维度的动作乘以第二个维度的大小,再加上第二个维度的动作。这样就将分解的动作映射回原始的单一动作空间。
然后,调用被包装的环境对象的step方法,传递映射后的动作值作为参数,获取观测值obs、奖励值reward、完成标志done和额外信息info。
最后,将观测值、奖励值、完成标志和额外信息作为结果返回。
所以当配置一个使用多重离散动作空间的实验时,有几个关键的配置步骤需要注意。
-
将action_shape从一个整数改为一个列表,列表中包含了分解动作空间的维度。这个改变需要在config.policy.model和env.info()这两个地方进行配置。config.policy.model是模型配置信息的键,而env.info()是环境对象的方法,用于获取环境的信息。
-
在配置文件的config.env中,将键multi_discrete的值设置为True。这个配置指示代码使用MultiDiscreteEnv包装器来处理多重离散动作空间。config.env是配置文件中用于指定环境配置的部分。
多重离散动作空间版本的Q-learning的forward_learn部分代码分析。
# ====================
# Q-learning forward
# ====================
self._learn_model.train()
self._target_model.train()
# Current q value (main model)
q_value = self._learn_model.forward(data['obs'])['logit']
# Target q value
with torch.no_grad():
target_q_value = self._target_model.forward(data['next_obs'])['logit']
# Max q value action (main model)
target_q_action = self._learn_model.forward(data['next_obs'])['action']
value_gamma = data.get('value_gamma')
if isinstance(q_value, torch.Tensor):
data_n = q_nstep_td_data(
q_value, target_q_value, data['action'], target_q_action, data['reward'], data['done'], data['weight']
)
loss, td_error_per_sample = q_nstep_td_error(data_n, self._gamma, nstep=self._nstep, value_gamma=value_gamma)
else:
action_num = len(q_value)
loss, td_error_per_sample = [], []
for i in range(action_num):
td_data = q_nstep_td_data(
q_value[i], target_q_value[i], data['action'][i], target_q_action[i], data['reward'], data['done'], data['weight']
)
loss_, td_error_per_sample_ = q_nstep_td_error(td_data, self._gamma, nstep=self._nstep)
loss.append(loss_)
td_error_per_sample.append(td_error_per_sample_.abs())
loss = sum(loss) / (len(loss) + 1e-8)
td_error_per_sample = sum(td_error_per_sample) / (len(td_error_per_sample) + 1e-8)
多重离散版本DQNMultiDiscretePolicy
继承DQNPolicy
并且只覆盖_forward_learn
接口. 在这个多重离散动作空间版本的 Q-learning 的 _forward_learn 中,每层动作空间使用全局奖励计算自己的 q 值、动作和 td 损失,计算遵循与单个动作空间计算相同的过程。
self._learn_model.train()
self._target_model.train()
这两行代码将学习模型(self._learn_model)和目标模型(self._target_model)设置为训练模式,以确保在前向计算中启用梯度计算。
q_value = self._learn_model.forward(data['obs'])['logit']
这行代码使用学习模型(self._learn_model)对当前状态(data[‘obs’])进行前向计算,然后获取Q值(q_value)。
with torch.no_grad():
target_q_value = self._target_model.forward(data['next_obs'])['logit']
target_q_action = self._learn_model.forward(data['next_obs'])['action']
在这个代码块中,使用torch.no_grad()上下文管理器,禁用梯度计算。然后,对目标模型(self._target_model)和学习模型(self._learn_model)分别对下一个状态(data[‘next_obs’])进行前向计算。从目标模型中获取目标Q值(target_q_value),从学习模型中获取目标Q值所对应的动作(target_q_action)。
value_gamma = data.get('value_gamma')
if isinstance(q_value, torch.Tensor):
data_n = q_nstep_td_data(
q_value, target_q_value, data['action'], target_q_action, data['reward'], data['done'], data['weight']
)
loss, td_error_per_sample = q_nstep_td_error(data_n, self._gamma, nstep=self._nstep, value_gamma=value_gamma)
else:
action_num = len(q_value)
loss, td_error_per_sample = [], []
for i in range(action_num):
td_data = q_nstep_td_data(
q_value[i], target_q_value[i], data['action'][i], target_q_action[i], data['reward'], data['done'], data['weight']
)
loss_, td_error_per_sample_ = q_nstep_td_error(td_data, self._gamma, nstep=self._nstep)
loss.append(loss_)
td_error_per_sample.append(td_error_per_sample_.abs())
loss = sum(loss) / (len(loss) + 1e-8)
td_error_per_sample = sum(td_error_per_sample) / (len(td_error_per_sample) + 1e-8)
这部分代码根据q_value
的类型来进行不同的处理。如果q_value
是一个torch.Tensor,说明只有一个动作空间。
在这种情况下,代码会调用q_nstep_td_data()
函数来计算TD数据(data_n),其中包括当前状态的Q值(q_value)、目标Q值(target_q_value)、动作(data[‘action’])、目标Q值所对应的动作(target_q_action)、奖励(data[‘reward’])、终止标志(data[‘done’])和权重(data[‘weight’])。
然后,调用q_nstep_td_error()
函数,使用TD数据(data_n)计算TD误差(td_error_per_sample),并将其与损失(loss)一起返回。
如果q_value
不是torch.Tensor,那么说明有多个动作空间。在这种情况下,代码会遍历每个动作空间,分别计算每个动作空间的TD数据和TD误差。然后,对每个动作空间的损失进行求和并计算平均损失(loss),对每个动作空间的TD误差进行求和并计算平均TD误差(td_error_per_sample)。
最后,计算得到的损失和TD误差会被返回供后续使用。
评论(3)
您还未登录,请登录后发表或查看评论