一、什么是多重离散动作空间?

多重离散动作空间是指一个环境中具有多个离散动作空间的情况。在传统的强化学习中,通常假设环境的动作空间是离散的,即代理可以选择的动作是有限且离散的。然而,在某些情况下,一个环境可能具有多个离散动作空间,每个动作空间对应于不同的维度。

举个例子,假设你有一个机器人控制问题,机器人可以在三个维度上采取动作:X轴方向、Y轴方向和Z轴方向。每个维度上的动作空间都是离散的,例如在X轴上可以选择向左、向右或不动,在Y轴和Z轴上也有类似的选择。这样,整个动作空间就被分解为三个离散动作空间的笛卡尔积。

多重离散动作空间可以用于描述具有多个维度的离散动作选择问题,每个维度都有自己的离散动作空间。在这种情况下,代理需要为每个维度选择一个动作,将这些动作组合成一个多维动作向量,然后执行这个向量作为一个整体的动作。

在强化学习中,针对多重离散动作空间的问题,我们可以使用适当的算法和技术来解决,例如深度Q网络(DQN)的多重离散版本或者使用策略梯度方法。这些算法可以允许代理在多个维度上做出独立的动作选择,并通过学习来优化整体的策略。

示例代码分析

第一个例子是使用OpenAI Gym库来创建离散动作空间的环境,并演示如何创建多重离散动作空间。

import gym
from gym.spaces import Discrete, MultiDiscrete
"""
e.g. Nintendo Game Controller
- Can be conceptualized as 3 discrete action spaces:
    1) Arrow Keys: Discrete 5  - NOOP[0], UP[1], RIGHT[2], DOWN[3], LEFT[4]  - params: min: 0, max: 4
    2) Button A:   Discrete 2  - NOOP[0], Pressed[1] - params: min: 0, max: 1
    3) Button B:   Discrete 2  - NOOP[0], Pressed[1] - params: min: 0, max: 1
"""

# discrete action space env
env = gym.make('Pong-v4')
assert env.action_space == Discrete(6)
# multi discrete action space
md_space = MultiDiscrete([2, 3])  # 6 = 2 * 3

示例中的环境为游戏《乓》(英语:Pong)是雅达利在1972年11月29日推出的一款投币式街机游戏。《乓》是一款乒乓球游戏,其英文名称“Pong”来自乒乓球击打后所发出的声音。《乓》很多时候也有认为是电子游戏历史上第一个街机电子游戏。在此游戏中,玩家的目的就是在模拟乒乓球比赛中夺取高分以击败电脑玩家。此游戏的开发者为艾伦·奥尔康。

这段代码使用了OpenAI Gym库来创建游戏环境,并定义了离散动作空间和多重离散动作空间。

import gym
from gym.spaces import Discrete, MultiDiscrete

首先,代码导入了gym模块和DiscreteMultiDiscrete类。gym是一个开源的强化学习库,提供了许多经典的强化学习环境和相关工具函数。DiscreteMultiDiscrete是Gym中定义离散动作空间的类。

接下来,代码给出了一个示例,将任天堂游戏控制器作为说明。注释中提到,游戏控制器可以被概念化为3个离散的动作空间。

1) Arrow Keys(方向键):离散5个动作,包括NOOP[0],UP[1],RIGHT[2],DOWN[3],LEFT[4]。这个动作空间的参数范围是0到4,最小值为0,最大值为4。
2) Button A(A按钮):离散2个动作,包括NOOP[0],Pressed[1]。这个动作空间的参数范围是0到1,最小值为0,最大值为1。
3) Button B(B按钮):离散2个动作,包括NOOP[0],Pressed[1]。这个动作空间的参数范围是0到1,最小值为0,最大值为1。

env = gym.make('Pong-v4')

然后,代码创建了一个离散动作空间的环境,使用的是名为’Pong-v4’的游戏环境。这个游戏环境是OpenAI Gym提供的一个版本,代表了Pong游戏。通过gym.make函数创建环境对象,并将其分配给名为env的变量。

assert env.action_space == Discrete(6)

接着,代码使用断言语句来验证环境的动作空间是否为离散动作空间,并检查动作空间的范围是否为0到5(共6个动作)。这是因为’Pong-v4’游戏环境的动作空间是一个离散动作空间,有6个动作可供选择。

md_space = MultiDiscrete([2, 3])  # 6 = 2 * 3

最后,代码创建了一个多重离散动作空间md_space,使用MultiDiscrete类来定义动作空间的维度和范围。在这个示例中,md_space是一个二维的动作空间,第一个维度有2个动作,第二个维度有3个动作,总共有6个动作。

MultiDiscreteEnv类

class MultiDiscreteEnv(gym.Wrapper):
    """Map the actions from the factorized action spaces to the original single action space.

    :param gym.Env env: the environment to wrap.
    :param list action_shape: dims of the the factorized action spaces.
    """

    def __init__(self, env, action_shape):
        super().__init__(env)
        self.action_shape = np.flip(np.cumprod(np.flip(np.array(action_shape))))

    def step(self, action):
        """
        Overview:
            Step the environment with the given factorized actions.
        Arguments:
            - action (:obj:`list`): a list contains the action output of each discrete dimension, e.g.: [1, 1] means 1 * 3 + 1 = 4 for a factorized action 2 * 3 = 6
        """
        action = action[0] * self.action_shape[1] + action[1]
        obs, reward, done, info = self.env.step(action)
        return obs, reward, done, info

这段代码定义了一个名为MultiDiscreteEnv的类,该类是gym.Wrapper的子类,用于将分解的动作空间映射到原始的单一动作空间。

构造函数init接受两个参数:env是要包装的环境对象,action_shape是分解动作空间的维度。

init方法中,首先调用了super().init(env)来调用父类gym.Wrapper的构造函数进行初始化。然后,使用np.flip和np.cumprod函数对action_shape进行处理,计算了一个用于映射分解动作到原始动作的形状数组self.action_shape。

step方法用于执行环境的一步操作。它接受一个参数action,该参数是一个列表,包含每个离散维度的动作输出。例如,[1, 1]表示对于一个分解动作2 * 3 = 6,其中第一个维度的动作是1,第二个维度的动作是1。

在step方法中,首先计算了一个合并的动作值action,通过将第一个维度的动作乘以第二个维度的大小,再加上第二个维度的动作。这样就将分解的动作映射回原始的单一动作空间。

然后,调用被包装的环境对象的step方法,传递映射后的动作值作为参数,获取观测值obs、奖励值reward、完成标志done和额外信息info。

最后,将观测值、奖励值、完成标志和额外信息作为结果返回。

所以当配置一个使用多重离散动作空间的实验时,有几个关键的配置步骤需要注意。

  • 将action_shape从一个整数改为一个列表,列表中包含了分解动作空间的维度。这个改变需要在config.policy.model和env.info()这两个地方进行配置。config.policy.model是模型配置信息的键,而env.info()是环境对象的方法,用于获取环境的信息。

  • 在配置文件的config.env中,将键multi_discrete的值设置为True。这个配置指示代码使用MultiDiscreteEnv包装器来处理多重离散动作空间。config.env是配置文件中用于指定环境配置的部分。

多重离散动作空间版本的Q-learning的forward_learn部分代码分析。

# ====================
# Q-learning forward
# ====================
self._learn_model.train()
self._target_model.train()
# Current q value (main model)
q_value = self._learn_model.forward(data['obs'])['logit']
# Target q value
with torch.no_grad():
    target_q_value = self._target_model.forward(data['next_obs'])['logit']
    # Max q value action (main model)
    target_q_action = self._learn_model.forward(data['next_obs'])['action']

value_gamma = data.get('value_gamma')
if isinstance(q_value, torch.Tensor):
    data_n = q_nstep_td_data(
        q_value, target_q_value, data['action'], target_q_action, data['reward'], data['done'], data['weight']
    )
    loss, td_error_per_sample = q_nstep_td_error(data_n, self._gamma, nstep=self._nstep, value_gamma=value_gamma)
else:
    action_num = len(q_value)
    loss, td_error_per_sample = [], []
    for i in range(action_num):
        td_data = q_nstep_td_data(
            q_value[i], target_q_value[i], data['action'][i], target_q_action[i], data['reward'], data['done'], data['weight']
        )
        loss_, td_error_per_sample_ = q_nstep_td_error(td_data, self._gamma, nstep=self._nstep)
        loss.append(loss_)
        td_error_per_sample.append(td_error_per_sample_.abs())
    loss = sum(loss) / (len(loss) + 1e-8)
    td_error_per_sample = sum(td_error_per_sample) / (len(td_error_per_sample) + 1e-8)

多重离散版本DQNMultiDiscretePolicy继承DQNPolicy并且只覆盖_forward_learn接口. 在这个多重离散动作空间版本的 Q-learning 的 _forward_learn 中,每层动作空间使用全局奖励计算自己的 q 值、动作和 td 损失,计算遵循与单个动作空间计算相同的过程。

self._learn_model.train()
self._target_model.train()

这两行代码将学习模型(self._learn_model)和目标模型(self._target_model)设置为训练模式,以确保在前向计算中启用梯度计算。

q_value = self._learn_model.forward(data['obs'])['logit']

这行代码使用学习模型(self._learn_model)对当前状态(data[‘obs’])进行前向计算,然后获取Q值(q_value)。

with torch.no_grad():
    target_q_value = self._target_model.forward(data['next_obs'])['logit']
    target_q_action = self._learn_model.forward(data['next_obs'])['action']

在这个代码块中,使用torch.no_grad()上下文管理器,禁用梯度计算。然后,对目标模型(self._target_model)和学习模型(self._learn_model)分别对下一个状态(data[‘next_obs’])进行前向计算。从目标模型中获取目标Q值(target_q_value),从学习模型中获取目标Q值所对应的动作(target_q_action)。

value_gamma = data.get('value_gamma')
if isinstance(q_value, torch.Tensor):
    data_n = q_nstep_td_data(
        q_value, target_q_value, data['action'], target_q_action, data['reward'], data['done'], data['weight']
    )
    loss, td_error_per_sample = q_nstep_td_error(data_n, self._gamma, nstep=self._nstep, value_gamma=value_gamma)
else:
    action_num = len(q_value)
    loss, td_error_per_sample = [], []
    for i in range(action_num):
        td_data = q_nstep_td_data(
            q_value[i], target_q_value[i], data['action'][i], target_q_action[i], data['reward'], data['done'], data['weight']
        )
        loss_, td_error_per_sample_ = q_nstep_td_error(td_data, self._gamma, nstep=self._nstep)
        loss.append(loss_)
        td_error_per_sample.append(td_error_per_sample_.abs())
    loss = sum(loss) / (len(loss) + 1e-8)
    td_error_per_sample = sum(td_error_per_sample) / (len(td_error_per_sample) + 1e-8)

这部分代码根据q_value的类型来进行不同的处理。如果q_value是一个torch.Tensor,说明只有一个动作空间。

在这种情况下,代码会调用q_nstep_td_data()函数来计算TD数据(data_n),其中包括当前状态的Q值(q_value)、目标Q值(target_q_value)、动作(data[‘action’])、目标Q值所对应的动作(target_q_action)、奖励(data[‘reward’])、终止标志(data[‘done’])和权重(data[‘weight’])。

然后,调用q_nstep_td_error()函数,使用TD数据(data_n)计算TD误差(td_error_per_sample),并将其与损失(loss)一起返回。

如果q_value不是torch.Tensor,那么说明有多个动作空间。在这种情况下,代码会遍历每个动作空间,分别计算每个动作空间的TD数据和TD误差。然后,对每个动作空间的损失进行求和并计算平均损失(loss),对每个动作空间的TD误差进行求和并计算平均TD误差(td_error_per_sample)。

最后,计算得到的损失和TD误差会被返回供后续使用。