强化学习(六) - 连续空间中的强化学习

      • 6.1 连续空间中的强化学习
      • 6.2 离散空间和连续空间

 

  • 6.3 离散化
    • 实例:小车上山
    • 6.3.1 相关程序

 

  • 6.3.2 程序注解
    • (1) 环境测试
    • (2) 离散化
  • (3) 模型训练
  • (4) 模型优化


6.1 连续空间中的强化学习


在之前的实例中,状态和动作的数量受到限制。使用小的,有限的马尔可夫决策过程(MDP),可以用表,字典或其他有限结构来表示动作价值函数。

例如,考虑下面的非常小的gridworld。假设世界有四个可能的状态,并且代理有四个可能的操作可供使用(上,下,左,右)。我们可以在表中表示估计的最佳操作价值函数,每个状态对应一个行,每个动作对应一个列。我们将此表称为Q表。

在这里插入图片描述

图6.1 Q表


但是,具有更大空间的MDP呢?考虑到Q表的每个状态必须有一行。因此,例如,如果有1000万个可能的状态,则Q表必须具有1000万行。此外,如果状态空间是连续的实数值的集合(无穷大),那么就不可能以有限的结构表示动作值。

强化学习算法通常分为两大类别: 基于模型的方法,例如策略迭代。以及需要已知转换和奖励模型的值迭代。它们本质上通过动态规划并使用该模型, 以迭代方式计算期望的价值函数和最优策略,另一方面 蒙特卡洛方法和时间差分学习等,不基于模型的方法不需要明确的模型。它们通过执行探索性动作对环境抽样,并使用获得的经验直接估计价价值函数。这就是强化学习的简单介绍 当然还有更多内容。

深度强化学习是一个相对较新的术语。 是指使用深度学习(主要是多层神经网络)解决强化学习问题的方法。 强化学习通常包含有限的 MDP,即状态和动作数量是有限的。 但是有太多的问题具有非常大的状态和动作空间,甚至由连续的实数组成。 传统算法使用表格或字典,或其他有限结构来记录状态和动作值,但是不再适合此类问题。 因此,我们首先要考虑的是,如何泛化这些算法以便适合大型连续空间。 这就为开发深度强化学习算法奠定了基础,包括深度 Q 学习等基于值的技巧,以及直接尝试优化策略的方法,例如策略梯度。 最后,会使用到结合这两类方法的更高级方法,即行动者-评论者方法。 

6.2 离散空间和连续空间


我们先来看看离散空间和连续空间的含义。回忆下马尔可夫决策流程的定义, 我们假设任何时间的环境状态来自于一组潜在状态,当该组合是有限组合时,我们可以将其称之为离散状态空间。动作也类似,如果有一组有限的动作,则表示环境有一个离散动作空间。

离散空间简化了问题,首先,使我们能够将任何状态和动作函数,表示为字典或查询表。假设有一个状态价值函数 V VV,它是从一组状态到实数的映射。如果将状态表示为整数,则可以将价值函数表示为字典 并将每个状态当做键。类似地, 假设有一个动作价值函数 Q,它将每个状态动作对映射到一个实数。同样 我们可以使用字典或将价值函数存储为表格或矩阵,每行对应一个状态,每列对应一个动作。

在这里插入图片描述

图6.2 价值迭代

离散空间对很多强化学习算法来说也很关键,例如, 在价值迭代中,这个内部 for 循环逐个遍历每个状态,并更新相应的估值V ( s ) V(s)V(s)。如果状态空间是连续的 则不可能这么操作,循环将永远持续下去,甚至对于有很多状态的离散状态空间来说, 这一流程也很快变得不可行,Q 学习等不基于模型的方法也需要离散空间。这里 我们对状态 S ′ 的所有潜在动作执行 max 运算。如果有一组有限的动作, 则很轻松。但是如果动作空间是连续的,这个小小的计算步骤本身就会变成完全失败的优化问题。那么连续空间到底是什么意思?

连续是离散的反义词。连续空间并不限定于一组独特的值, 例如整数。相反, 它可以是一定范围的值, 通常是实数。这意味着状态值等量值。例如表示离散情况的条形图,每个状态对应一个长条。现在需要转换成预期范围内的密度图。同一记法也扩展到了环境中,状态不再是单个实数 而是一个此类数字的向量。这样依然称之为连续空间 只是不再是一个维度。

在继续深入讲解之前,我们尝试了解下为何连续状态空间很重要。它们来自何处?思考一个高级决策制定任务, 例如下棋。经常可以将一组潜在状态看做离散状态。每个棋子都在棋盘上的哪个方框内。我们不需要精确地确定,每个棋子处在方框内的哪个位置或朝着哪个方向。虽然我们也可以了解这些细节信息,并思考, 为何你的骑士在瞪着我的王后。但是这些信息与要解决的问题不相关,我们可以从游戏模型中删除这些信息。通常, 网格环境在强化学习中非常热门。它们使我们能够直观地查看智能体在空间环境中的行为如何,但是现实的物理空间并不能始终清晰地划分为网格。

在这里插入图片描述

图6.3 玩飞镖的机器人

动作也可以是连续的动作。例如玩飞镖的机器人,它必须设置扔飞镖的高度和角度。选择相应级别的扔力。即使这些值出现小小的变化,也会对飞镖最终落在板上的位置有很大的影响。通常。 需要在物理环境中,采取的大多数动作本质上都是连续动作。很明显。 我们需要修改表示法或算法,或者同时修改二者以便处理连续空间。我们将讨论的两个主要策略是,离散化和函数逼近。

6.3 离散化顾名思义,离散化就是将连续空间转换为离散空间。对于某些环境, 离散化状态空间效果很好,使我们能够几乎不加修改就能使用现有的算法。动作也可以离散化,例如, 角可以拆分为完整度数。甚至按 90 度递增。

在这里插入图片描述

图6.4 扫地机器人离散化环境如果合适的话,现在假设一个离散化环境中有一些物体, 机器人需要绕过这些障碍物。对于网格表示法,我们只能标记存在物体的单元格 甚至稍微超出范围,称之为占据网格。

但是如果我们以较大网格离散化,可能会使智能体以为,没有绕过这些障碍物以抵达目标位置的道路。如果我们能够根据这些障碍物调整网格,那么可以为智能体找到一条潜在的道路。另一种方法是在需要时将网格拆分为更小的单元格,依然是逼近结果。但是可以让我们在需要的位置分配更多的状态表示,这样比将整个状态空间拆分为更小的单元格合适。拆分整个状态空间可能会增加状态的总数,进而增加计算值函数所需的时间。

在这里插入图片描述

图6.5 在不同挡位速度的汽车油耗


这种离散化适合网格世界等空间领域,但是其他状态空间呢?我们来看看汽车换挡这个不同领域的问题。如今的大部分汽车都会自动换挡, 汽车如何决定切换到哪一挡以及何时换挡?这个图表简单地描述了对于一辆普通汽车,油耗如何随着不同挡位速度的变化而变化。假设状态仅包含车辆速度,以及当前挡位,奖励与油耗成反比,智能体可以采取的动作包括换到更高挡位或更低挡位。虽然速度是连续值,但是可以离散化为不同的范围。最佳划分方式是一个挡位对应一个速度范围,注意这些范围可以具有不同的长度,即离散化是不均匀的。如果状态空间还有其他维度, 例如油门位置,那么它们也可以不均匀地细分。

 

实例:小车上山


小车上山(MountainCar-v0)是一个经典的控制问题.如图6-6所示,小车在一段范围内行驶.在任意时刻,在水平方向看小车的位置范围是[-1.2, 0.6],速度的范围是[-0.07, 0.07].在每个时刻,智能体可以对小车施加3种动作中的一种, 向左施力, 不实例, 向右施力.智能体施力和小车的水平位置会共同决定小车 下一个时刻的速度.当某时刻小车的水平位置大于0.5时,控制目标成功达成,回合结束.控制的目标是让小车尽可能少的步骤达到目标.一般认为,如果智能体在连续100个回合中的平均步数≤ 100 \leq 100≤100,就认为问题解决了. 

在绝大多数情况下,智能体简单的向右施力并不足以让小车成功越过目标.

 本节假设智能体并不知道环境确定小车位置和速度的数学表达式.实际上,小车位置和速度是有数学表达式的.记t tt时刻(t = 0 , 1 , 2 , . . . ),小车的位置为X t ( X t ∈ [ − 1.2 , 0.6 ] ) , 速度为V t ( V t ∈ [ − 0.07 , 0.07 ] ) ,智能体施力为A t ∈ { 0 , 1 , 2 } , 初始状态X 0 ∈ [ − 0.6 , − 0.4 ) , V 0 = 0 .

 在这里插入图片描述

图6.6 小车上山问题

 6.3.1 相关程序


首先我们建立关于Agent的Q学习类
K6_MountainCar_Agent

import numpy as np


# 定义可用于离散空间的等间距网格。
def create_uniform_grid(low, high, bins=(10, 10)):
    grid = [np.linspace(low[dim], high[dim], bins[dim] + 1)[1:-1] for dim in range(len(bins))]
    print("Uniform grid: [, ] /  => ")
    for l, h, b, splits in zip(low, high, bins, grid):
        print("    [{}, {}] / {} => {}".format(l, h, b, splits))
    return grid


# 根据给定的网格离散样本。
def discretize(sample, grid):
    return list(int(np.digitize(s, g)) for s, g in zip(sample, grid))  # 返回索引值


class QLearningAgent:
    """Q-Learning agent,,通过离散化可以作用于连续的状态空间。"""

    def __init__(self, env, state_grid, alpha=0.02, gamma=0.99,
                 epsilon=1.0, epsilon_decay_rate=0.9995, min_epsilon=.01, seed=505):
        """初始化变量,创建离散化网格。"""
        # Environment info
        self.env = env
        self.state_grid = state_grid
        self.state_size = tuple(len(splits) + 1 for splits in self.state_grid)  # n-维状态空间
        self.action_size = self.env.action_space.n  # 1-维离散动作空间
        self.seed = np.random.seed(seed)
        print("Environment:", self.env)
        print("State space size:", self.state_size)
        print("Action space size:", self.action_size)

        # 学习模型参数
        self.alpha = alpha  # 学习率
        self.gamma = gamma  # 折扣因子
        self.epsilon = self.initial_epsilon = epsilon  # 初始探索率
        self.epsilon_decay_rate = epsilon_decay_rate  # epsilon衰减系数
        self.min_epsilon = min_epsilon

        # 创建Q表
        self.q_table = np.zeros(shape=(self.state_size + (self.action_size,)))
        print("Q table size:", self.q_table.shape)

    def preprocess_state(self, state):
        """将连续状态映射到它的离散表示。"""
        return tuple(discretize(state, self.state_grid))

    def reset_episode(self, state):
        """为新的事件重置变量."""
        # 逐步降低探索率
        self.epsilon *= self.epsilon_decay_rate
        self.epsilon = max(self.epsilon, self.min_epsilon)

        # 决定初始行动
        self.last_state = self.preprocess_state(state)
        self.last_action = np.argmax(self.q_table[self.last_state])
        return self.last_action

    def reset_exploration(self, epsilon=None):
        """重置训练时使用的探索率."""
        self.epsilon = epsilon if epsilon is not None else self.initial_epsilon

    def act(self, state, reward=None, done=None, mode='train'):
        """选择next操作并更新内部Q表 (when mode != 'test')."""
        state = self.preprocess_state(state)
        if mode == 'test':
            # 测试模式:简单地产生一个动作
            action = np.argmax(self.q_table[state])
        else:
            # 训练模式(默认):更新Q表,选择下一步行动
            # Note: 我们用当前状态,回报更新最后的状态动作对的Q表条目
            self.q_table[self.last_state + (self.last_action,)] += self.alpha * \
                                                                   (reward + self.gamma * max(self.q_table[state]) -
                                                                    self.q_table[self.last_state + (self.last_action,)])

            # 探索 vs. 利用
            do_exploration = np.random.uniform(0, 1) < self.epsilon
            if do_exploration:
                # 随机选择一个动作
                action = np.random.randint(0, self.action_size)
            else:
                # 从Q表中选择最佳动作
                action = np.argmax(self.q_table[state])

        # 存储当前状态,下一步操作
        self.last_state = state
        self.last_action = action
        return action

主程序K6_MountainCar如下

import sys
import gym
import numpy as np

import matplotlib.collections as mc
import pandas as pd
import matplotlib.pyplot as plt
from K6_MountainCar_Agent import QLearningAgent


# 定义可用于离散空间的等间距网格。
def create_uniform_grid(low, high, bins=(10, 10)):
    grid = [np.linspace(low[dim], high[dim], bins[dim] + 1)[1:-1] for dim in range(len(bins))]
    print("Uniform grid: [, ] /  => ")
    for l, h, b, splits in zip(low, high, bins, grid):
        print("    [{}, {}] / {} => {}".format(l, h, b, splits))
    return grid


# 根据给定的网格离散样本。
def discretize(sample, grid):
    return list(int(np.digitize(s, g)) for s, g in zip(sample, grid))  # 返回索引值


# 在给定的二维网格上可视化原始的和离散的样本。
def visualize_samples(samples, discretized_samples, grid, low=None, high=None):
    fig, ax = plt.subplots(figsize=(10, 10))

    # 显示网格
    ax.xaxis.set_major_locator(plt.FixedLocator(grid[0]))
    ax.yaxis.set_major_locator(plt.FixedLocator(grid[1]))
    ax.grid(True)

    # 如果指定了边界(低、高),则使用它们来设置轴的限制
    if low is not None and high is not None:
        ax.set_xlim(low[0], high[0])
        ax.set_ylim(low[1], high[1])
    else:
        # 否则使用第一个、最后一个网格位置为low、high(为了进一步映射离散化的样本)
        low = [splits[0] for splits in grid]
        high = [splits[-1] for splits in grid]

    # 将每个离散的样本(实际上是一个索引)映射到相应网格单元格的中心
    grid_extended = np.hstack((np.array([low]).T, grid, np.array([high]).T))  # add low and high ends
    grid_centers = (grid_extended[:, 1:] + grid_extended[:, :-1]) / 2  # compute center of each grid cell
    locs = np.stack(grid_centers[i, discretized_samples[:, i]] for i in range(len(grid))).T  # map discretized samples

    ax.plot(samples[:, 0], samples[:, 1], 'o')  # 绘制初始样本
    ax.plot(locs[:, 0], locs[:, 1], 's')  # 绘制离散后的样本
    ax.add_collection(mc.LineCollection(list(zip(samples, locs)),
                                        colors='orange'))  # 添加一条线连接每个原始离散样本
    ax.legend(['original', 'discretized'])


def run(agent, env, num_episodes=20000, mode='train'):
    """给定的强化学习环境中运行agent并返回分数."""
    scores = []
    max_avg_score = -np.inf
    for i_episode in range(1, num_episodes + 1):
        # 初始化事件
        state = env.reset()
        action = agent.reset_episode(state)
        total_reward = 0
        done = False

        # 运行步骤直到完成
        while not done:
            state, reward, done, info = env.step(action)
            total_reward += reward
            action = agent.act(state, reward, done, mode)

        # 保存最终分数
        scores.append(total_reward)

        # 输出事件状态
        if mode == 'train':
            if len(scores) > 100:
                avg_score = np.mean(scores[-100:])
                if avg_score > max_avg_score:
                    max_avg_score = avg_score
            if i_episode % 100 == 0:
                print("\rEpisode {}/{} | Max Average Score: {}".format(i_episode, num_episodes, max_avg_score), end="")
                sys.stdout.flush()

    return scores


def plot_scores(scores, rolling_window=100):
    """Plot scores and optional rolling mean using specified window."""
    plt.plot(scores)
    plt.title("Scores")
    rolling_mean = pd.Series(scores).rolling(rolling_window).mean()
    plt.plot(rolling_mean)
    return rolling_mean


def plot_q_table(q_table):
    """Visualize max Q-value for each state and corresponding action."""
    q_image = np.max(q_table, axis=2)       # max Q-value for each state
    q_actions = np.argmax(q_table, axis=2)  # best action for each state

    fig, ax = plt.subplots(figsize=(10, 10))
    cax = ax.imshow(q_image, cmap='jet')
    cbar = fig.colorbar(cax)
    for x in range(q_image.shape[0]):
        for y in range(q_image.shape[1]):
            ax.text(x, y, q_actions[x, y], color='white',
                    horizontalalignment='center', verticalalignment='center')
    ax.grid(False)
    ax.set_title("Q-table, size: {}".format(q_table.shape))
    ax.set_xlabel('position')
    ax.set_ylabel('velocity')


# main function
if __name__ == "__main__":
    # 创建一个环境并设置随机种子
    env = gym.make("MountainCar-v0")
    env.seed(505)

    #  环境测试
    env_test = False
    if env_test is True:
        state = env.reset()
        score = 0
        for t in range(200):
            action = env.action_space.sample()
            env.render()
            state, reward, done, _ = env.step(action)
            score += reward
            if done:
                break
        print("Final score:", score)
        env.close()

        # Explore state (observation) space
        print("State space:", env.observation_space)
        print("- low:", env.observation_space.low)
        print("- high:", env.observation_space.high)

        print("State space samples:")
        print(np.array([env.observation_space.sample() for i in range(10)]))

        # action space
        print("Action space:", env.action_space)

        # 从动作空间生成一些示例
        print("Action space samples:")
        print(np.array([env.action_space.sample() for i in range(10)]))

    state_grid = create_uniform_grid(env.observation_space.low, env.observation_space.high, bins=(10, 10))
    q_agent = QLearningAgent(env, state_grid)

    # 以不同模式运行,方便测试结果
    run_mode = True
    if run_mode is True:
        q_agent.q_table = np.load('q_table.npy', allow_pickle=True)
        state = env.reset()
        score = 0
        for t in range(200):
            action = q_agent.act(state, mode='test')
            env.render()
            state, reward, done, _ = env.step(action)
            score += reward
            if done:
                break
        print('Final score:', score)
        env.close()
    else:
        scores = run(q_agent, env)

        # plot data
        plt.plot(scores)
        plt.title("Scores")
        rolling_mean = plot_scores(scores)
        plt.show()

        test_scores = run(q_agent, env, num_episodes=100, mode='test')
        print("[TEST] Completed {} episodes with avg. score = {}".format(len(test_scores), np.mean(test_scores)))
        _ = plot_scores(test_scores)

        plot_q_table(q_agent.q_table)
        plt.show()
        np.save('q_table.npy', q_agent.q_table)

6.3.2 程序注解

(1) 环境测试

首先来看一下MountainCar这个环境条件,在主程序K6_MountainCar中,可以通过更改env_test的值,使其为True来进行环境的测试.输出如下

Final score: -200.0
State space: Box(2,)
- low: [-1.2  -0.07]
- high: [0.6  0.07]
State space samples:
[[-0.00739189 -0.01564005]
 [-0.4523059   0.04406008]
 [-0.1716276  -0.04345753]
 [-0.22001167  0.01072859]
 [-1.1200054  -0.05656335]
 [-0.91125274 -0.05465209]
 [-0.38469937 -0.06611647]
 [ 0.15009743  0.03064202]
 [-1.1663655  -0.06613734]
 [-0.352857   -0.01548356]]
Action space: Discrete(3)
Action space samples:
[1 1 2 2 1 1 1 1 0 0]

(2) 离散化

   # 将观测空间离散化,其中bins控制离散精度
    state_grid = create_uniform_grid(env.observation_space.low, env.observation_space.high, bins=(10, 10))



这个部分的作用是将观测空间进行离散化,借助create_uniform_grid()函数

# 定义可用于离散空间的等间距网格。
def create_uniform_grid(low, high, bins=(10, 10)):
    grid = [np.linspace(low[dim], high[dim], bins[dim] + 1)[1:-1] for dim in range(len(bins))]
    print("Uniform grid: [, ] /  => ")
    for l, h, b, splits in zip(low, high, bins, grid):
        print("    [{}, {}] / {} => {}".format(l, h, b, splits))
    return grid



其中bins定义了离散化的精度

Uniform grid: [, ] /  => 
    [-1.2000000476837158, 0.6000000238418579] / 10 => [-1.02000004 -0.84000003 -0.66000003 -0.48000002 -0.30000001 -0.12
  0.06        0.24000001  0.42000002]
    [-0.07000000029802322, 0.07000000029802322] / 10 => [-0.056 -0.042 -0.028 -0.014  0.     0.014  0.028  0.042  0.056]



(3) 模型训练

q_agent = QLearningAgent(env, state_grid)
    # 以不同模式运行,方便测试结果
    run_mode = False
    # 运行测试模式
    if run_mode is True:
        q_agent.q_table = np.load('q_table.npy', allow_pickle=True)
        state = env.reset()
        score = 0
        for t in range(200):
            action = q_agent.act(state, mode='test')
            env.render()
            state, reward, done, _ = env.step(action)
            score += reward
            if done:
                break
        print('Final score:', score)
        env.close()
    # 训练模式
    else:
        scores = run(q_agent, env)

        # plot data
        plt.plot(scores)
        plt.title("Scores")
        rolling_mean = plot_scores(scores)
        plt.show()

        test_scores = run(q_agent, env, num_episodes=100, mode='test')
        print("[TEST] Completed {} episodes with avg. score = {}".format(len(test_scores), np.mean(test_scores)))
        _ = plot_scores(test_scores)

        plot_q_table(q_agent.q_table)
        plt.show()

在这里提供了两种运行模式,当run_mode = False时,为训练模式. 此时会进行模型训练,相关训练过程的输出如下

--Agent--
Environment: 
State space size: (10, 10)
Action space size: 3
Q table size: (10, 10, 3)
Episode 20000/20000 | Max Average Score: -130.49[TEST] Completed 100 episodes with avg. score = -146.91



可视化q表的输出:

在这里插入图片描述
分数和平均分数输出:

在这里插入图片描述

可以看到平均分数随着训练轮数的增加而增加.

同时,我们也可以在test模式下运行模型,来分析和观测所获得的分数,输出如下,

在这里插入图片描述


在训练完成之后,程序会将Q表保存到q_table.npy文件中,方便之后的使用.

当run_mode = True 时,程序以运行模式进行.程序会从文件中读取训练后的q_表的值,运行完之后,程序自动退出.

在这里插入图片描述


(4) 模型优化
之前在环境介绍中说过,小车上山问题在分数小于100时可以认为解决了.在之前的模型中,我们的平均分数保持在-150左右,我们可以通过细化离散度来获得更好的分数.我们将bins更改为(20, 20),并将训练轮数调整到50000轮.运行程序分数和平均分数输出:

在这里插入图片描述

test模式输出:

在这里插入图片描述

可以看到分数有了一定的提升.

在这里插入图片描述


将run_mode设置成True即可查看效果.