cross-entropy方法(CEM)是一种用于重要性采样和优化的蒙特卡洛方法。它适用于静态或噪声目标的组合问题和连续问题[1]。
该方法通过重复两个阶段来近似最佳的重要性采样估计器:

  1. 从概率分布中抽取样本。
  2. 最小化此分布和目标分布之间的交叉熵,以便在下一次迭代中生成更好的样本。

鲁文·鲁宾斯坦(Reuven Rubinstein)在rare event simulation的问题下开发了该方法,在rare event simulation中,必须估计极小的概率,例如在网络可靠性分析、排队模型或电信系统性能分析中。该方法还应用于旅行商问题、二次分配问题、DNA序列比对问题、最大割问题和缓冲区分配问题。约翰·舒尔曼(John Schulman)声称,一种用于参数化策略优化的进化算法,在复杂的RL问题上“令人尴尬地好[2]”。

这个方法是如何工作的?

假设您不知道什么是智能体、环境和策略。您只需要得到一个“黑盒”,它将一些数字作为输入,并输出一些其他数字。您只能选择输入值并观察输出。如何猜测输入,使输出成为想要的值?

一种简单的方法是获取大量输入,查看产生的输出,选择产生最佳输出的输入,并对其进行调整,直到对所看到的输出感到满意。这基本上就是cross-entropy方法所做的。

那么,我如何使用它来解决我的RL问题呢?

让我们通过一个例子逐步了解CEM的工作原理。为了更好地理解实现,我们下面通过代码来实现看一下[3]。

我们通过一个栗子来看一下使用cross-entropy方法来训练gym的小车登山环境。

import gym
import math
import numpy as np
from collections import deque
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

env = gym.make('MountainCarContinuous-v0')
env.seed(101)
np.random.seed(101)

print('observation space:', env.observation_space)
print('action space:', env.action_space)
print('  - low:', env.action_space.low)
print('  - high:', env.action_space.high)

class Agent(nn.Module):
    def __init__(self, env, h_size=16):
        super(Agent, self).__init__()
        self.env = env
        # state, hidden layer, action sizes
        self.s_size = env.observation_space.shape[0]
        self.h_size = h_size
        self.a_size = env.action_space.shape[0]
        # define layers
        self.fc1 = nn.Linear(self.s_size, self.h_size)
        self.fc2 = nn.Linear(self.h_size, self.a_size)

    def set_weights(self, weights):
        s_size = self.s_size
        h_size = self.h_size
        a_size = self.a_size
        # separate the weights for each layer
        fc1_end = (s_size*h_size)+h_size
        fc1_W = torch.from_numpy(weights[:s_size*h_size].reshape(s_size, h_size))
        fc1_b = torch.from_numpy(weights[s_size*h_size:fc1_end])
        fc2_W = torch.from_numpy(weights[fc1_end:fc1_end+(h_size*a_size)].reshape(h_size, a_size))
        fc2_b = torch.from_numpy(weights[fc1_end+(h_size*a_size):])
        # set the weights for each layer
        self.fc1.weight.data.copy_(fc1_W.view_as(self.fc1.weight.data))
        self.fc1.bias.data.copy_(fc1_b.view_as(self.fc1.bias.data))
        self.fc2.weight.data.copy_(fc2_W.view_as(self.fc2.weight.data))
        self.fc2.bias.data.copy_(fc2_b.view_as(self.fc2.bias.data))

    def get_weights_dim(self):
        return (self.s_size+1)*self.h_size + (self.h_size+1)*self.a_size

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.tanh(self.fc2(x))
        return x.cpu().data

    def evaluate(self, weights, gamma=1.0, max_t=5000):
        self.set_weights(weights)
        episode_return = 0.0
        state = self.env.reset()
        for t in range(max_t):
            state = torch.from_numpy(state).float().to(device)
            action = self.forward(state)
            state, reward, done, _ = self.env.step(action)
            episode_return += reward * math.pow(gamma, t)
            if done:
                break
        return episode_return

agent = Agent(env).to(device)

def cem(n_iterations=500, max_t=1000, gamma=1.0, print_every=10, pop_size=50, elite_frac=0.2, sigma=0.5):
    """PyTorch implementation of the cross-entropy method.

    Params
    ======
        n_iterations (int): maximum number of training iterations
        max_t (int): maximum number of timesteps per episode
        gamma (float): discount rate
        print_every (int): how often to print average score (over last 100 episodes)
        pop_size (int): size of population at each iteration
        elite_frac (float): percentage of top performers to use in update
        sigma (float): standard deviation of additive noise
    """
    n_elite=int(pop_size*elite_frac)

    scores_deque = deque(maxlen=100)
    scores = []
    best_weight = sigma*np.random.randn(agent.get_weights_dim())

    for i_iteration in range(1, n_iterations+1):
        weights_pop = [best_weight + (sigma*np.random.randn(agent.get_weights_dim())) for i in range(pop_size)]
        rewards = np.array([agent.evaluate(weights, gamma, max_t) for weights in weights_pop])

        elite_idxs = rewards.argsort()[-n_elite:]
        elite_weights = [weights_pop[i] for i in elite_idxs]
        best_weight = np.array(elite_weights).mean(axis=0)

        reward = agent.evaluate(best_weight, gamma=1.0)
        scores_deque.append(reward)
        scores.append(reward)

        torch.save(agent.state_dict(), 'checkpoint.pth')

        if i_iteration % print_every == 0:
            print('Episode {}\tAverage Score: {:.2f}'.format(i_iteration, np.mean(scores_deque)))

        if np.mean(scores_deque)>=90.0:
            print('\nEnvironment solved in {:d} iterations!\tAverage Score: {:.2f}'.format(i_iteration-100, np.mean(scores_deque)))
            break
    return scores

scores = cem()

# plot the scores
fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot(np.arange(1, len(scores)+1), scores)
plt.ylabel('Score')
plt.xlabel('Episode #')
plt.show()


# load the weights from file
agent.load_state_dict(torch.load('checkpoint.pth'))

state = env.reset()
while True:
    state = torch.from_numpy(state).float().to(device)
    with torch.no_grad():
        action = agent(state)
    env.render()
    next_state, reward, done, _ = env.step(action)
    state = next_state
    if done:
        break

env.close()

我们看一下训练过程

收敛的很快。

cross-entropy method是一种简单的可以用来训练强化学习的算法。这个方法在俄罗斯方块游戏上也取得了很好的效果[4]。cross-entropy算法可以作为一个baseline。

引用
[1] https://en.wikipedia.org/wiki/Cross-entropy_method#cite_note-1
[2] MLSS 2016 on Deep Reinforcement Learning by John Schulman (https://www.youtube.com/watch?v=aUrX-rP_ss4)
[3] https://github.com/udacity/deep-reinforcement-learning/tree/master/cross-entropy
[4] Learning Tetris Using the Noisy Cross-Entropy Method, https://direct.mit.edu/neco/article/18/12/2936-2941/7108