Deep Q Network

Value-based Sequential Decision

Principle

  1. 抛弃Q表这种Q值记录方式,使用神经网络生成Q值,在状态较多的情况下格外有效率

2.

    • Q估计:通过NN预测出的 [公式] 的最大值
    • Q现实:Q 估计中最大值的动作来换取环境中的奖励 reward+ [公式] 下一步 [公式] 中通过NN预测出的 [公式] 的最大值

3. DQN两大利器

  • Experience replay: 作为一种离线学习,每次 DQN 更新的时候,我们都可以随机抽取一些之前的经历进行学习。随机抽取这种做法打乱了经历之间的相关性,也使得神经网络更新更有效率。
  • Fixed Q-target: 在 DQN 中使用到两个结构相同但参数不同的神经网络, 预测 Q 估计的神经网络具备最新的参数, 而预测 Q 现实的神经网络使用的参数则是很久以前的.

4. 算法 [公式]

  • 记忆库 (用于重复学习)
  • 神经网络计算 Q 值
  • 暂时冻结 q_target 参数 (切断相关性)

Implement

class Network(nn.Module):
    def __init__(self, obs_space, act_space):
        super(Network, self).__init__()
        self.fc1 = nn.Linear(obs_space, 12)
        self.relu = nn.ReLU()
        self.out = nn.Linear(12, act_space)

    def forward(self, s):
        x = self.fc1(s)
        x = self.relu(x)
        Q_value = self.out(x)
        return Q_value

class Skylark_DQN():
    def __init__(self, env, alpha = 0.1, gamma = 0.6, epsilon=0.1, update_freq = 200):
        self.obs_space = env.observation_space.n
        self.act_space = env.action_space.n
        self.eval_net = Network(self.obs_space, self.act_space)
        self.target_net = Network(self.obs_space, self.act_space)
        self.env = env
        self.alpha = alpha      # learning rate
        self.gamma = gamma      # discount rate
        self.epsilon = epsilon  # epsilon-greedy 
        self.buffer_size = 1000 # total size of replay/memory buffer
        self.traj_count = 0   # count of trajectories in buffer
        self.replay_buffer = np.zeros((self.buffer_size, self.obs_space*2+2))
        self.total_step = 0
        self.update_freq = update_freq # Freq of target update

    def choose_action(self, state):
        state = torch.unsqueeze(torch.FloatTensor(state), 0)
        if np.random.uniform(0, 1) < self.epsilon:
            action = self.env.action_space.sample()     # Explore action space
        else:
            action = torch.argmax(self.eval_net(state)).numpy() # Exploit learned values
        return action

    def store_trajectory(self, s, a, r, s_):
        traj = np.hstack((s,[a,r],s_))
        index = self.traj_count % self.buffer_size # 记忆多了就覆盖旧数据
        self.replay_buffer[index, :] = traj
        self.traj_count += 1

    def learn(self, batch_size=128):
        if self.total_step % self.update_freq == 0:
             self.target_net.load_state_dict(self.eval_net.state_dict())

        # 抽取记忆库中的批数据
        sample_index = np.random.choice(self.buffer_size, batch_size)
        b_memory = self.replay_buffer[sample_index, :]
        b_s = torch.FloatTensor(b_memory[:, :self.obs_space])
        b_a = torch.LongTensor(b_memory[:, self.obs_space:self.obs_space+1].astype(int))
        b_r = torch.FloatTensor(b_memory[:, self.obs_space+1:self.obs_space+2])
        b_s_ = torch.FloatTensor(b_memory[:, -self.obs_space:])

        # 针对做过的动作b_a, 来选 q_eval 的值, (q_eval 原本有所有动作的值)
        q_eval = self.eval_net(b_s).gather(1, b_a)  # shape (batch, 1)
        q_next = self.target_net(b_s_).detach()     # q_next 不进行反向传递误差, 所以 detach
        q_target = b_r + self.gamma * q_next.max(1)[0]   # shape (batch, 1)
        loss = nn.MSELoss()(q_eval, q_target)

        # 计算, 更新 eval net
        optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=self.alpha)    # torch 的优化器
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    def train(self, num_episodes, batch_size = 128, num_steps = 100):
        for i in range(num_episodes):
            state = self.env.reset()

            steps, penalties, reward, sum_rew = 0, 0, 0, 0
            done = False
            while not done and steps < num_steps:
                action = self.choose_action(state)
                # Interaction with Env
                next_state, reward, done, info = self.env.step(action) 

                self.store_trajectory(state, action, reward, next_state)
                if self.traj_count > self.buffer_size:
                    self.learn(batch_size)

                if reward == -10:
                    penalties += 1

                sum_rew += reward
                state = next_state
                steps += 1
                self.total_step += 1
            print('Episode: {} | Avg_reward: {} | Length: {}'.format(i, sum_rew/steps, steps))
        print("Training finished.")

更多实现方式见本专栏关联Github

Reference

  1. 什么是 DQN